Created on 05-14-2018 11:55 AM - edited on 03-11-2020 11:15 AM by SumitraMenon
Article
Short Description:
This Tutorial describes how to add partition field to the content of the flowfile, create dynamic partitions based on flowfile content and store data into HDFS directory using NiFi.
PartitionRecord Processor Params:
Record Reader :
Specifies the Controller Service to use for reading incoming data
Record Writer:
Specifies the Controller Service to use for writing out the records.
Dynamic Properties:
**Note**
Flow:
Flow Overview:
1. GenerateFlowfile //to generate csv data
2. UpdateAttribute //add schema name attribute to the flowfile
3. Update Record //add partition field to the flowfile content
4. Partition Record //create partitions based on flowfile content
5. ConvertAvroToORC //convert avro format data into ORC format
6. UpdateAttribute //change the filename to unique id(UUID)
7. PutHDFS //save the data into HDFS based on partition attribute value
8. ReplaceText //replace the content of the flowfile with HiveDDL and MSCK repair table statements
9. PutHiveQL //execute hive statements
10.PutEmail //sends an e-mail to configured recipients for each incoming failure flowfile.
Flow Explanation:
1.GenerateFlowFile Configs:
Configure the processor as shown below
Sample data data.txt
2.UpdateAttribute Configurations:
Configure the processor as shown below
Added schema.name attribute to the flowfile so that we can use this name in our AvroSchemaRegistry and use same AvroSchemaRegistry for Record Reader/Writer controller services.
3.UpdateRecord Configs:
Record Reader:
Our input data is in csv format configure CSV reader controller service as shown below.
As we are not having header so I’m defining the schema in AvroSchemaRegistry.
If your incoming data having header then we can treat first line as header property to true and Schema Access Strategy Use String Fields From Header(nifi will take string type for all the header fields) by using these two properties we can generate schema dynamically.
Record Writer:
I’m using AvroRecordSetWriter to convert CSV format data into Avro then use ConvertAvroToORC processor to store orc files into HDFS.
Configure the controller service as shown below
Schema write strategy property value as Embed Avro Schema //we are embedded Avro Schema into the content of the flowfile.
AvroSchemaRegistry:
Add new property shown below screenshot
Add new property
sch
{ "namespace": "nifi", "name": "person", "type": "record", "fields": [ { "name": "id", "type": ["null","int"] }, { "name": "first_name", "type": ["null","string"]}, { "name": "last_name", "type": ["null","string"]}, { "name": "email", "type": ["null","string"]}, { "name": "gender", "type": ["null","string"]}, { "name": "joined_date", "type": ["null","string"]}, { "name": "partition_dt", "type": ["null","string"]} ] }
Add new property in update record processor shown below screenshot
/partition_dt
format(toDate( /joined_date, "yyyy-MM-dd HH:mm:ss"),"yyyy-MM-dd")
We are adding partition_dt column to the content of flowfile based on joined_date field value in the format of yyyy-MM-dd.
Please refer to this link for more details regarding RecordPathDomainSpecific Language.
4.PartitionRecord Configs:
Record Reader
Configure/enable AvroReader controller service as shown below
We are using Schema Access Strategy property value as Use Embedded Avro Schema //as the feeding avro file will have schema embedded in it.
Record Writer
Configure/enable the AvroSetWriter controller service as shown below
Add new dynamic property to Partition Record processor
partition_dt
/partition_dt
Output Flowfile attributes:
The processor added partition_dt attribute with value as 2018-05-10 and record.count 357(number of records in this flowfile)
5.ConvertAvroToORC Configs:
Keep the processor configs to default unless if you need to add ORC configuration resources property value.
6.UpdateAttribute Configs:
In this processor we are changing the filename to unique value.
7.PutHDFS Configs:
Configure/enable the processor based on your NiFi instance setup(Kerberos principal/keytab..etc)
Directory
/user/shu/employee/partition_dt=${partition_dt}
Check for newly created sub-directories in employee directory
[bash$] hdfs dfs -ls /user/shu/employee/ Found 3 items drwxr-xr-x - nifi hdfs 0 2018-05-13 10:16 /user/shu/employee/partition_dt=2018-05-09 drwxr-xr-x - nifi hdfs 0 2018-05-13 10:16 /user/shu/employee/partition_dt=2018-05-10 drwxr-xr-x - nifi hdfs 0 2018-05-13 10:16 /user/shu/employee/partition_dt=2018-05-11
Check newly created files in each sub-directories of employee directory
[bash$] hdfs dfs -ls -h -r /user/shu/employee/* Found 1 items -rw-r--r-- 3 nifi hdfs 18.7 K 2018-05-13 10:16 /user/shu/employee/partition_dt=2018-05-09/da5c645a-36d4-415e-9920-467314b31900.orc Found 1 items -rw-r--r-- 3 nifi hdfs 21.3 K 2018-05-13 10:16 /user/shu/employee/partition_dt=2018-05-10/121db330-7a61-4b8e-a1ee-5cb1692c6c22.orc Found 1 items -rw-r--r-- 3 nifi hdfs 19.8 K 2018-05-13 10:14 /user/shu/employee/partition_dt=2018-05-11/b3e15fec-b78c-4f0b-ab0b-3fa89d057a9b.orc
8.ReplaceText Configs:
Configure the processor as shown below.
9.PutHiveQL Configs:
Configure/enable Hive connection pool
Based on the contents of flowfile this processor executes those statements when we are having more than one statement as content of flowfile then processor separates each statement based on Statement Delimiter value.
Rollback On Failure
True //failed flowfiles will be stayed in incoming connection without penalizing and try to reprocess repeatedly until processed successfully.It is important to set adequate 'Yield Duration' to avoid retrying too frequently.
False(default) //based on the error type flowfiles will be routed to either failure (or) retry relationship and processor can continue with next flowfile.
Check Hive table about partitions and data
hive> show partitions default.employee; +--------------------------+--+ | partition | +--------------------------+--+ | partition_dt=2018-05-09 | | partition_dt=2018-05-10 | | partition_dt=2018-05-11 | +--------------------------+--+ 3 rows selected (0.488 seconds)
hive> select * from default.employee limit 10; +-----+-------------+--------------+-------------------------------+---------+----------------------+---------------+--+ | id | first_name | last_name | email | gender | joined_date | partition_dt | +-----+-------------+--------------+-------------------------------+---------+----------------------+---------------+--+ | 1 | Cordy | Dupxx | cdupey0@example.com | Male | 2018-05-09 12:56:15 | 2018-05-09 | | 2 | Trumann | Hindxxxx | thindmoor1@example.com | Male | 2018-05-09 06:45:17 | 2018-05-09 | | 7 | Stu | Manxxm | smanus6@example.net | Male | 2018-05-09 08:54:13 | 2018-05-09 | | 8 | Petey | Petxxx | ppetofi7@example.edu | Male | 2018-05-09 02:04:42 | 2018-05-09 | | 11 | Marilyn | Brodxxx | mbrodeaua@example.com | Female | 2018-05-09 21:47:17 | 2018-05-09 | | 14 | Gibby | Branxxxx | gbrandinod@example.com | Male | 2018-05-09 10:52:04 | 2018-05-09 | | 16 | Friederike | Schwxxxx | fschwartzf@example.gov | Female | 2018-05-09 02:42:17 | 2018-05-09 | | 19 | Patrizius | Hallexxxxx | phalleybonei@example.net | Male | 2018-05-09 19:16:25 | 2018-05-09 | | 20 | Chic | Ecclesxxxx | ceccleshallj@example.com | Male | 2018-05-09 16:39:13 | 2018-05-09 | | 25 | Annabelle | Dwerryhxxxx | adwerryhouseo@example.com. | Female | 2018-05-09 16:20:31 | 2018-05-09 | +-----+-------------+--------------+-------------------------------+---------+----------------------+---------------+--+
Now we have converted CSV data to ORC format, dynamically partitioned the records, stored to HDFS in NiFi and read data from Hive table.
All the failure relationships are feeded to funnel then to PutEmail processor to get notified to the configured recipients for each incoming failure flowfile.
Please refer to this and this links for more details regarding UpdateRecord Processor.
Please refer this link for NiFi Expression Language.
Reference flow.xml
Created on 10-15-2021 06:07 AM
Followed your steps but getting error on ConvertRecord processor /partition_dt is invalid because 'partition_dt' is not an associated property or has no validator associated with it. How can I resolve?