Member since
06-08-2017
1049
Posts
518
Kudos Received
312
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 11273 | 04-15-2020 05:01 PM | |
| 7167 | 10-15-2019 08:12 PM | |
| 3155 | 10-12-2019 08:29 PM | |
| 11612 | 09-21-2019 10:04 AM | |
| 4393 | 09-19-2019 07:11 AM |
05-15-2018
12:58 PM
@vinayak krishnan I think you are using Packaging Format "ZIP" in your Second UnpackContent processor i.e after merge content processor. Merge content processor adds mime.type attribute with the value application/flowfile-v3 (or)... etc, based on Merge Format property value in Merge Content processor. In second UnpackContent processor we need to use either Packaging Format
use mime.type attribute (or) Packaging Format
flowfile-stream-v3 //change the value based on merge format property value in Merge content processor flowfile-v3 format generates .PKG flowfile from merge content processor, when we specify .zip as packaging format processor not able to unpack the contents because conflict in the packaging format. If you are still having issues please attach the flow and config of processors screenshots, it will be helpful to find root cause.
... View more
05-15-2018
11:54 AM
@Alex M Use the below expression language to convert into date ${<attribute-name>:toDate("MMM dd yyyy hh:mm:ss","GMT"):format("yyyy-MM-dd HH:mm:ss")} in the above expression language we are matching the input date by using toDate function and using format function we are formatting the date value to yyyy-MM-dd HH:mm:ss. to get only the date then use format function until date i.e yyyy-MM-dd ${<attribute-name>:toDate("MMM dd yyyy hh:mm:ss","GMT"):format("yyyy-MM-dd")} Example:- I'm having generateflowfile processor with new property dr added as UpdateAttribute Configs:- Added two properties in update attribute processor as below gmt
${dr:toDate("MMM dd yyyy hh:mm:ss","GMT"):format("yyyy-MM-dd HH:mm:ss")}
not_gmt
${dr:toDate("MMM dd yyyy hh:mm:ss"):format("yyyy-MM-dd HH:mm:ss")} Output Flowfile attributes:- In addition we can use timezone with format function also,For more details regarding date conversions refer to this link. - If the Answer addressed your question, Click on Accept button below to accept the answer, That would be great help to Community users to find solution quickly for these kind of issues.
... View more
05-15-2018
12:00 AM
1 Kudo
@vinayak krishnan by using unpack content processor recursively unpacks all the files from .zip file i.e we are not going to get test_folder. But Unpack Content processor adds path attribute that will have value that describes what is the actual path in your case test/test_folder. If you want to create intermediate folder that will have all the files in it then use Merge Content processor with Merge Strategy as FlowFile Stream, v3 and correlation attribute value as path By using MergeContent processor we are creating .PKG flowfile which will have individual files packaged together based on correlation attribute. If you want to change the filename then use UpdateAttribute processor after MergeContent processor filename ${path:substringAfterLast("/")} for windows ${path:substringAfterLast('${literal("\")}')} Now we are updating the .PKG name to the actual folder name inside .zip file. If you want to uncompress/unpack these .PKG flowfile then use UnpackContent processor with Packaging Format
use mime.type attribute //as this attribute added by merge content processor Now you are going to have all the files unpacked from .PKG flowfile. By using MergeContent processor we can create intermediate .PKG flowfiles then unpack the .PKG when ever you needed in your flow. Sample Flow:- Reference flow.xml 191223-uncompress.xml - If the Answer addressed your question, Click on Accept button below to accept the answer, That would be great help to Community users to find solution quickly for these kind of issues.
... View more
05-14-2018
11:55 AM
8 Kudos
Article
Short Description:
This Tutorial describes how to add partition field to the content of the flowfile, create dynamic partitions based on flowfile content and store data into HDFS directory using NiFi.
PartitionRecord Processor Params:
Record Reader :
Specifies the Controller Service to use for reading incoming data
Record Writer:
Specifies the Controller Service to use for writing out the records.
Dynamic Properties:
We need to add at least one property name and value(recordpath) that will be evaluated on each record of the flowfile.
Each record will be grouped with the other like records in the group.
**Note**
No attribute will be added if the value returned for the RecordPath is null or is not a scalar value (i.e., the value is an Array, Map, or Record).
Flow:
Flow Overview:
1. GenerateFlowfile //to generate csv data 2. UpdateAttribute //add schema name attribute to the flowfile 3. Update Record //add partition field to the flowfile content 4. Partition Record //create partitions based on flowfile content 5. ConvertAvroToORC //convert avro format data into ORC format 6. UpdateAttribute //change the filename to unique id(UUID) 7. PutHDFS //save the data into HDFS based on partition attribute value 8. ReplaceText //replace the content of the flowfile with HiveDDL and MSCK repair table statements 9. PutHiveQL //execute hive statements 10.PutEmail //sends an e-mail to configured recipients for each incoming failure flowfile.
Flow Explanation:
1.GenerateFlowFile Configs:
Configure the processor as shown below
Sample data data.txt
2.UpdateAttribute Configurations:
Configure the processor as shown below
Added schema.name attribute to the flowfile so that we can use this name in our AvroSchemaRegistry and use same AvroSchemaRegistry for Record Reader/Writer controller services.
3.UpdateRecord Configs:
Record Reader:
Our input data is in csv format configure CSV reader controller service as shown below.
As we are not having header so I’m defining the schema in AvroSchemaRegistry. If your incoming data having header then we can treat first line as header property to true and Schema Access Strategy Use String Fields From Header(nifi will take string type for all the header fields) by using these two properties we can generate schema dynamically.
Record Writer:
I’m using AvroRecordSetWriter to convert CSV format data into Avro then use ConvertAvroToORC processor to store orc files into HDFS.
Configure the controller service as shown below
Schema write strategy property value as Embed Avro Schema //we are embedded Avro Schema into the content of the flowfile.
AvroSchemaRegistry:
In this controller we are defining our shared avro schema used by both reader/writer controllers in this flow.
Validate Field Names value as true //if you are having – in the field names then keep this property value to false then controller service won’t validate field names.
Add new property shown below screenshot
Add new property
sch
{
"namespace": "nifi",
"name": "person",
"type": "record",
"fields": [
{ "name": "id", "type": ["null","int"] },
{ "name": "first_name", "type": ["null","string"]},
{ "name": "last_name", "type": ["null","string"]},
{ "name": "email", "type": ["null","string"]},
{ "name": "gender", "type": ["null","string"]},
{ "name": "joined_date", "type": ["null","string"]},
{ "name": "partition_dt", "type": ["null","string"]}
]
}
In this avro schema I’m allowing null values for all the fields, if you don’t want to allow null values for specific fields then keep the type as just string/int without default values.
Then the processor will route the flowfile to failure relationship if the field value having null’s. (or) We can use validate record processor to validate contents of flowfile against given schema.
Enable AvroSchemaRegistry and all the reference controller services
Add new property in update record processor shown below screenshot
/partition_dt
format(toDate( /joined_date, "yyyy-MM-dd HH:mm:ss"),"yyyy-MM-dd")
We are adding partition_dt column to the content of flowfile based on joined_date field value in the format of yyyy-MM-dd.
Please refer to this link for more details regarding RecordPathDomainSpecific Language.
4.PartitionRecord Configs:
Record Reader
Configure/enable AvroReader controller service as shown below
We are using Schema Access Strategy property value as Use Embedded Avro Schema //as the feeding avro file will have schema embedded in it.
Record Writer
Configure/enable the AvroSetWriter controller service as shown below
Add new dynamic property to Partition Record processor
partition_dt
/partition_dt
Now the processor reads the incoming data and partitions the records based on the partition_dt value.
Based on how many number of unique partition values we are having in the flowfile content those many output flowfiles are generated.
Each flowfile will have partition_dt attribute with the value describes which partition this flowfile belongs to.
Output Flowfile attributes:
The processor added partition_dt attribute with value as 2018-05-10 and record.count 357(number of records in this flowfile)
5.ConvertAvroToORC Configs:
Keep the processor configs to default unless if you need to add ORC configuration resources property value.
6.UpdateAttribute Configs:
In this processor we are changing the filename to unique value.
7.PutHDFS Configs:
Configure/enable the processor based on your NiFi instance setup(Kerberos principal/keytab..etc)
Directory
/user/shu/employee/partition_dt=${partition_dt}
In this expression hive table is created at /user/shu/employee location and partition_dt partition column name.
Based on partition_dt attribute value we are creating new directories (or) store files into existing directories dynamically.
Check for newly created sub-directories in employee directory
[bash$] hdfs dfs -ls /user/shu/employee/
Found 3 items
drwxr-xr-x - nifi hdfs 0 2018-05-13 10:16 /user/shu/employee/partition_dt=2018-05-09
drwxr-xr-x - nifi hdfs 0 2018-05-13 10:16 /user/shu/employee/partition_dt=2018-05-10
drwxr-xr-x - nifi hdfs 0 2018-05-13 10:16 /user/shu/employee/partition_dt=2018-05-11
Check newly created files in each sub-directories of employee directory
[bash$] hdfs dfs -ls -h -r /user/shu/employee/*
Found 1 items
-rw-r--r-- 3 nifi hdfs 18.7 K 2018-05-13 10:16 /user/shu/employee/partition_dt=2018-05-09/da5c645a-36d4-415e-9920-467314b31900.orc
Found 1 items
-rw-r--r-- 3 nifi hdfs 21.3 K 2018-05-13 10:16 /user/shu/employee/partition_dt=2018-05-10/121db330-7a61-4b8e-a1ee-5cb1692c6c22.orc
Found 1 items
-rw-r--r-- 3 nifi hdfs 19.8 K 2018-05-13 10:14 /user/shu/employee/partition_dt=2018-05-11/b3e15fec-b78c-4f0b-ab0b-3fa89d057a9b.orc
8.ReplaceText Configs:
Configure the processor as shown below.
In this processor we are having create table if not exist statement as we need to create partitioned table so we are not using hive.ddl attribute that has been added from ConvertAvroToORC processor.
Each time when we add new file into HDFS directory hive doesn’t know about newly added files, we need to run msck repair table (or) alter table add partition(<partition-name>) location <location name> statements to update the newly added information in Hive metastore.
9.PutHiveQL Configs:
Configure/enable Hive connection pool
Based on the contents of flowfile this processor executes those statements when we are having more than one statement as content of flowfile then processor separates each statement based on Statement Delimiter value.
Rollback On Failure
True //failed flowfiles will be stayed in incoming connection without penalizing and try to reprocess repeatedly until processed successfully.It is important to set adequate 'Yield Duration' to avoid retrying too frequently.
False(default) //based on the error type flowfiles will be routed to either failure (or) retry relationship and processor can continue with next flowfile.
Based on number of records you are getting in each flowfile we need to use SplitRecord processor to split millions of records into smaller chunks to avoid out of memory issues.
If we are ending up small size files in partitions then we need to run compaction on the small files, copy the specific partition where there are lot of small files to temp table, make sure while compacting specific partition we are not writing any data to be written to this partition/directory in HDFS and overwrite the same partition with less number of files.
Check Hive table about partitions and data
hive> show partitions default.employee;
+--------------------------+--+
| partition |
+--------------------------+--+
| partition_dt=2018-05-09 |
| partition_dt=2018-05-10 |
| partition_dt=2018-05-11 |
+--------------------------+--+
3 rows selected (0.488 seconds)
hive> select * from default.employee limit 10;
+-----+-------------+--------------+-------------------------------+---------+----------------------+---------------+--+
| id | first_name | last_name | email | gender | joined_date | partition_dt |
+-----+-------------+--------------+-------------------------------+---------+----------------------+---------------+--+
| 1 | Cordy | Dupxx | cdupey0@example.com | Male | 2018-05-09 12:56:15 | 2018-05-09 |
| 2 | Trumann | Hindxxxx | thindmoor1@example.com | Male | 2018-05-09 06:45:17 | 2018-05-09 |
| 7 | Stu | Manxxm | smanus6@example.net | Male | 2018-05-09 08:54:13 | 2018-05-09 |
| 8 | Petey | Petxxx | ppetofi7@example.edu | Male | 2018-05-09 02:04:42 | 2018-05-09 |
| 11 | Marilyn | Brodxxx | mbrodeaua@example.com | Female | 2018-05-09 21:47:17 | 2018-05-09 |
| 14 | Gibby | Branxxxx | gbrandinod@example.com | Male | 2018-05-09 10:52:04 | 2018-05-09 |
| 16 | Friederike | Schwxxxx | fschwartzf@example.gov | Female | 2018-05-09 02:42:17 | 2018-05-09 |
| 19 | Patrizius | Hallexxxxx | phalleybonei@example.net | Male | 2018-05-09 19:16:25 | 2018-05-09 |
| 20 | Chic | Ecclesxxxx | ceccleshallj@example.com | Male | 2018-05-09 16:39:13 | 2018-05-09 |
| 25 | Annabelle | Dwerryhxxxx | adwerryhouseo@example.com. | Female | 2018-05-09 16:20:31 | 2018-05-09 |
+-----+-------------+--------------+-------------------------------+---------+----------------------+---------------+--+
Now we have converted CSV data to ORC format, dynamically partitioned the records, stored to HDFS in NiFi and read data from Hive table.
All the failure relationships are feeded to funnel then to PutEmail processor to get notified to the configured recipients for each incoming failure flowfile.
Please refer to this and this links for more details regarding UpdateRecord Processor.
Please refer this link for NiFi Expression Language.
Reference flow.xml
create-dynamic-partitions-in-nifi.xml
... View more
Labels:
05-14-2018
03:46 AM
@Ryan A I think the issue is with incompatible java version, use JAVA 8 version. If you haven't set JAVA_HOME then set in environment variables with path Like "C:/program files/jdk1.8" Jira addressing when NiFi run with java 9 version and the issue not resolved yet https://issues.apache.org/jira/browse/NIFI-4419
... View more
05-12-2018
01:10 PM
@adam chui
We are not able execute commands in ExecuteStreamCommand processor like [bash] cat <filename>|grep <search string> (or) [bash] ls |wc -l But you can use QueryRecord processor and write the sql query(to filter or count..) then the query is going to be executed on the contents of the flowfile. Take a look into this link for more details regarding Query Record processor.
... View more
05-11-2018
01:18 PM
1 Kudo
@adam chui
By using Execute Stream Command processor we can pass the argument as the directory path that need to delete and by using rm -rf command we can delete folder and everything inside the folder. Example:- I'm having del_nifi directory in tmp directory and nifi having access to delete this folder [bash tmp]$ ll drwxr-xr-x 2 nifi hadoop 29 May 11 09:04 del_nifi [bash tmp]$ cd del_nifi [bash del_nifi]$ ll
-rw-r--r-- 1 nifi hadoop 20 May 11 09:04 6728259595979699 Flow:- GenerateFlowFile Configs:- Added new property called directory with /tmp/del_nifi with the value. ExecuteStreamCommand Configs:- Command Arguments -rf;${directory} //we are passing directory attribute and -rf as arguments to rm command Command Path rm
Now ExecuteStreamCommand processor will delete the directory/subdirectories/files in the directory. - More info regarding rm command [bash tmp]$ rm --help -r, -R, --recursive remove directories and their contents recursively -f, --force ignore nonexistent files and arguments, never prompt
... View more
05-11-2018
12:13 PM
@Naeem Ullah Khan By using record oriented processors you can ignore the header in the CSV file and define your own Schema to read the incoming file. Example:- Use ConvertRecord processor with Record Reader as CsvReader and in the csv reader controller service keep the below property values as Treat First Line as Header
true
Ignore CSV Header Column Names
true With this property values we are treating the first line as header and ignoring the header column names. Define your Avro schema for the incoming CSV file, by using this setting you are able to parse the incoming file. In Record Reader as CSVSetWriter and refer the same avro schema registry for writer also(if you need all the columns needed in the output flowfile). Keep the below property to true Include Header Line true Each Csv file will get new header matching with the avro schema. Refer to this link how to configure Convert Record processor and there are bunch of other articles regarding configurations of record oriented processors.
... View more
05-11-2018
10:53 AM
2 Kudos
@Naeem Ullah Khan You can use Replace text processor with Replacement Strategy as prepend and keep your custom header in Replacement value property value. if your file size more than 1 MB then you need to change the Maximum Buffer Size according to your flow file size. With this configs each flowfile will have custom header line added as first line of the flowfile and all the content will be added from second line. - If the Answer helped to resolve your issue, Click on Accept button below to accept the answer, That would be great help to Community users to find solution quickly for these kind of issues.
... View more
05-11-2018
12:43 AM
1 Kudo
@vivek jain Without using JOLT tranform you can achieve the same expected output with Evaluate and Split Json processors. Example:- i have used your input json in generate flowfile processor to test out this flow. EvaluateJsonPath Configs:- Change the below property value Destination
flowfile-attribute Add new properties to the processor resource_id
$.resource.id
resource_name
$.resource.name now we are extracting id,name values and assigning to resource_id,resource_name attributes. SplitJson Configs:- As you are having data array split the array using split json processor. Configure the processor as below JsonPath Expression
$.data EvaluateJsonPath Configs:- Once we have splitted the data array we are going to have 2 flowfiles having same resource_id,resource_name attributes. Change the below property value Destination
flowfile-attribute Add new propety as measurement_key
$.measurement.key
measurement_timestamp
$.timestamp
measurement_value
$.measurement.value
value
$.value Now we are going to extract all the values and keep them as attributes to the flowfile. AttributesToJSON processor:- Use this processor to prepare the required message Attributes List
resource_id,resource_name,measurement_key,measurement_value,measurement_timestamp,value Now we are going to have 2 flowfiles then you can feed those flowfile to PutHBaseJson processor because puthbasejson processor expects one json message at a time, use some unique field(combination of attributes (or) ${UUID()}..etc) as rowkey value so that You are not going to overwrite the existing data in HBase. if you want to merge them into 1 then use merge content processor with defragment as merge strategy and prepare the a valid json array of two messages in it so that you can use PutHBaseRecord processor to process chunks of messages at a time. I have attached my sample flow.xml save/upload and make changes as per your needs flatten-json-191073.xml - If the Answer addressed your question, Click on Accept button below to accept the answer, That would be great help to Community users to find solution quickly for these kind of issues.
... View more