- Subscribe to RSS Feed
- Mark as New
- Mark as Read
- Bookmark
- Subscribe
- Printer Friendly Page
- Report Inappropriate Content
Created on
05-14-2018
11:55 AM
- edited on
03-11-2020
11:15 AM
by
SumitraMenon
Article
Short Description:
This Tutorial describes how to add partition field to the content of the flowfile, create dynamic partitions based on flowfile content and store data into HDFS directory using NiFi.
PartitionRecord Processor Params:
Record Reader :
Specifies the Controller Service to use for reading incoming data
Record Writer:
Specifies the Controller Service to use for writing out the records.
Dynamic Properties:
- We need to add at least one property name and value(recordpath) that will be evaluated on each record of the flowfile.
- Each record will be grouped with the other like records in the group.
**Note**
- No attribute will be added if the value returned for the RecordPath is null or is not a scalar value (i.e., the value is an Array, Map, or Record).
Flow:
Flow Overview:
1. GenerateFlowfile //to generate csv data
2. UpdateAttribute //add schema name attribute to the flowfile
3. Update Record //add partition field to the flowfile content
4. Partition Record //create partitions based on flowfile content
5. ConvertAvroToORC //convert avro format data into ORC format
6. UpdateAttribute //change the filename to unique id(UUID)
7. PutHDFS //save the data into HDFS based on partition attribute value
8. ReplaceText //replace the content of the flowfile with HiveDDL and MSCK repair table statements
9. PutHiveQL //execute hive statements
10.PutEmail //sends an e-mail to configured recipients for each incoming failure flowfile.
Flow Explanation:
1.GenerateFlowFile Configs:
Configure the processor as shown below
Sample data data.txt
2.UpdateAttribute Configurations:
Configure the processor as shown below
Added schema.name attribute to the flowfile so that we can use this name in our AvroSchemaRegistry and use same AvroSchemaRegistry for Record Reader/Writer controller services.
3.UpdateRecord Configs:
Record Reader:
Our input data is in csv format configure CSV reader controller service as shown below.
As we are not having header so I’m defining the schema in AvroSchemaRegistry.
If your incoming data having header then we can treat first line as header property to true and Schema Access Strategy Use String Fields From Header(nifi will take string type for all the header fields) by using these two properties we can generate schema dynamically.
Record Writer:
I’m using AvroRecordSetWriter to convert CSV format data into Avro then use ConvertAvroToORC processor to store orc files into HDFS.
Configure the controller service as shown below
Schema write strategy property value as Embed Avro Schema //we are embedded Avro Schema into the content of the flowfile.
AvroSchemaRegistry:
- In this controller we are defining our shared avro schema used by both reader/writer controllers in this flow.
- Validate Field Names value as true //if you are having – in the field names then keep this property value to false then controller service won’t validate field names.
Add new property shown below screenshot
Add new property
sch
{ "namespace": "nifi", "name": "person", "type": "record", "fields": [ { "name": "id", "type": ["null","int"] }, { "name": "first_name", "type": ["null","string"]}, { "name": "last_name", "type": ["null","string"]}, { "name": "email", "type": ["null","string"]}, { "name": "gender", "type": ["null","string"]}, { "name": "joined_date", "type": ["null","string"]}, { "name": "partition_dt", "type": ["null","string"]} ] }
- In this avro schema I’m allowing null values for all the fields, if you don’t want to allow null values for specific fields then keep the type as just string/int without default values.
- Then the processor will route the flowfile to failure relationship if the field value having null’s. (or) We can use validate record processor to validate contents of flowfile against given schema.
- Enable AvroSchemaRegistry and all the reference controller services
Add new property in update record processor shown below screenshot
/partition_dt
format(toDate( /joined_date, "yyyy-MM-dd HH:mm:ss"),"yyyy-MM-dd")
We are adding partition_dt column to the content of flowfile based on joined_date field value in the format of yyyy-MM-dd.
Please refer to this link for more details regarding RecordPathDomainSpecific Language.
4.PartitionRecord Configs:
Record Reader
Configure/enable AvroReader controller service as shown below
We are using Schema Access Strategy property value as Use Embedded Avro Schema //as the feeding avro file will have schema embedded in it.
Record Writer
Configure/enable the AvroSetWriter controller service as shown below
Add new dynamic property to Partition Record processor
partition_dt
/partition_dt
- Now the processor reads the incoming data and partitions the records based on the partition_dt value.
- Based on how many number of unique partition values we are having in the flowfile content those many output flowfiles are generated.
- Each flowfile will have partition_dt attribute with the value describes which partition this flowfile belongs to.
Output Flowfile attributes:
The processor added partition_dt attribute with value as 2018-05-10 and record.count 357(number of records in this flowfile)
5.ConvertAvroToORC Configs:
Keep the processor configs to default unless if you need to add ORC configuration resources property value.
6.UpdateAttribute Configs:
In this processor we are changing the filename to unique value.
7.PutHDFS Configs:
Configure/enable the processor based on your NiFi instance setup(Kerberos principal/keytab..etc)
Directory
/user/shu/employee/partition_dt=${partition_dt}
- In this expression hive table is created at /user/shu/employee location and partition_dt partition column name.
- Based on partition_dt attribute value we are creating new directories (or) store files into existing directories dynamically.
Check for newly created sub-directories in employee directory
[bash$] hdfs dfs -ls /user/shu/employee/ Found 3 items drwxr-xr-x - nifi hdfs 0 2018-05-13 10:16 /user/shu/employee/partition_dt=2018-05-09 drwxr-xr-x - nifi hdfs 0 2018-05-13 10:16 /user/shu/employee/partition_dt=2018-05-10 drwxr-xr-x - nifi hdfs 0 2018-05-13 10:16 /user/shu/employee/partition_dt=2018-05-11
Check newly created files in each sub-directories of employee directory
[bash$] hdfs dfs -ls -h -r /user/shu/employee/* Found 1 items -rw-r--r-- 3 nifi hdfs 18.7 K 2018-05-13 10:16 /user/shu/employee/partition_dt=2018-05-09/da5c645a-36d4-415e-9920-467314b31900.orc Found 1 items -rw-r--r-- 3 nifi hdfs 21.3 K 2018-05-13 10:16 /user/shu/employee/partition_dt=2018-05-10/121db330-7a61-4b8e-a1ee-5cb1692c6c22.orc Found 1 items -rw-r--r-- 3 nifi hdfs 19.8 K 2018-05-13 10:14 /user/shu/employee/partition_dt=2018-05-11/b3e15fec-b78c-4f0b-ab0b-3fa89d057a9b.orc
8.ReplaceText Configs:
Configure the processor as shown below.
- In this processor we are having create table if not exist statement as we need to create partitioned table so we are not using hive.ddl attribute that has been added from ConvertAvroToORC processor.
- Each time when we add new file into HDFS directory hive doesn’t know about newly added files, we need to run msck repair table (or) alter table add partition(<partition-name>) location <location name> statements to update the newly added information in Hive metastore.
9.PutHiveQL Configs:
Configure/enable Hive connection pool
Based on the contents of flowfile this processor executes those statements when we are having more than one statement as content of flowfile then processor separates each statement based on Statement Delimiter value.
Rollback On Failure
True //failed flowfiles will be stayed in incoming connection without penalizing and try to reprocess repeatedly until processed successfully.It is important to set adequate 'Yield Duration' to avoid retrying too frequently.
False(default) //based on the error type flowfiles will be routed to either failure (or) retry relationship and processor can continue with next flowfile.
- Based on number of records you are getting in each flowfile we need to use SplitRecord processor to split millions of records into smaller chunks to avoid out of memory issues.
- If we are ending up small size files in partitions then we need to run compaction on the small files, copy the specific partition where there are lot of small files to temp table, make sure while compacting specific partition we are not writing any data to be written to this partition/directory in HDFS and overwrite the same partition with less number of files.
Check Hive table about partitions and data
hive> show partitions default.employee; +--------------------------+--+ | partition | +--------------------------+--+ | partition_dt=2018-05-09 | | partition_dt=2018-05-10 | | partition_dt=2018-05-11 | +--------------------------+--+ 3 rows selected (0.488 seconds)
hive> select * from default.employee limit 10; +-----+-------------+--------------+-------------------------------+---------+----------------------+---------------+--+ | id | first_name | last_name | email | gender | joined_date | partition_dt | +-----+-------------+--------------+-------------------------------+---------+----------------------+---------------+--+ | 1 | Cordy | Dupxx | cdupey0@example.com | Male | 2018-05-09 12:56:15 | 2018-05-09 | | 2 | Trumann | Hindxxxx | thindmoor1@example.com | Male | 2018-05-09 06:45:17 | 2018-05-09 | | 7 | Stu | Manxxm | smanus6@example.net | Male | 2018-05-09 08:54:13 | 2018-05-09 | | 8 | Petey | Petxxx | ppetofi7@example.edu | Male | 2018-05-09 02:04:42 | 2018-05-09 | | 11 | Marilyn | Brodxxx | mbrodeaua@example.com | Female | 2018-05-09 21:47:17 | 2018-05-09 | | 14 | Gibby | Branxxxx | gbrandinod@example.com | Male | 2018-05-09 10:52:04 | 2018-05-09 | | 16 | Friederike | Schwxxxx | fschwartzf@example.gov | Female | 2018-05-09 02:42:17 | 2018-05-09 | | 19 | Patrizius | Hallexxxxx | phalleybonei@example.net | Male | 2018-05-09 19:16:25 | 2018-05-09 | | 20 | Chic | Ecclesxxxx | ceccleshallj@example.com | Male | 2018-05-09 16:39:13 | 2018-05-09 | | 25 | Annabelle | Dwerryhxxxx | adwerryhouseo@example.com. | Female | 2018-05-09 16:20:31 | 2018-05-09 | +-----+-------------+--------------+-------------------------------+---------+----------------------+---------------+--+
Now we have converted CSV data to ORC format, dynamically partitioned the records, stored to HDFS in NiFi and read data from Hive table.
All the failure relationships are feeded to funnel then to PutEmail processor to get notified to the configured recipients for each incoming failure flowfile.
Please refer to this and this links for more details regarding UpdateRecord Processor.
Please refer this link for NiFi Expression Language.
Reference flow.xml
Created on 10-15-2021 06:07 AM
- Mark as Read
- Mark as New
- Bookmark
- Permalink
- Report Inappropriate Content
Followed your steps but getting error on ConvertRecord processor /partition_dt is invalid because 'partition_dt' is not an associated property or has no validator associated with it. How can I resolve?