Community Articles

Find and share helpful community-sourced technical articles.
Announcements
Celebrating as our community reaches 100,000 members! Thank you!
Labels (1)
avatar
Master Guru

Article

Short Description:

This Tutorial describes how to add partition field to the content of the flowfile, create dynamic partitions based on flowfile content and store data into HDFS directory using NiFi.

PartitionRecord Processor Params:

Record Reader :

Specifies the Controller Service to use for reading incoming data

Record Writer:

Specifies the Controller Service to use for writing out the records.

Dynamic Properties:

  • We need to add at least one property name and value(recordpath) that will be evaluated on each record of the flowfile.
  • Each record will be grouped with the other like records in the group.

**Note**

  • No attribute will be added if the value returned for the RecordPath is null or is not a scalar value (i.e., the value is an Array, Map, or Record).

Flow:

74384-flow.png

 

Flow Overview:

1. GenerateFlowfile //to generate csv data
2. UpdateAttribute //add schema name attribute to the flowfile
3. Update Record //add partition field to the flowfile content
4. Partition Record //create partitions based on flowfile content
5. ConvertAvroToORC //convert avro format data into ORC format
6. UpdateAttribute //change the filename to unique id(UUID)
7. PutHDFS //save the data into HDFS based on partition attribute value
8. ReplaceText //replace the content of the flowfile with HiveDDL and MSCK repair table statements
9. PutHiveQL //execute hive statements
10.PutEmail //sends an e-mail to configured recipients for each incoming failure flowfile.

Flow Explanation:

1.GenerateFlowFile Configs:

Configure the processor as shown below

74385-gflowfile.png

Sample data data.txt

2.UpdateAttribute Configurations:

 

Configure the processor as shown below

74386-add-schema-name-attribute.png

Added schema.name attribute to the flowfile so that we can use this name in our AvroSchemaRegistry and use same AvroSchemaRegistry for Record Reader/Writer controller services.

3.UpdateRecord Configs:

Record Reader:

Our input data is in csv format configure CSV reader controller service as shown below.

74387-csvreader.png

As we are not having header so I’m defining the schema in AvroSchemaRegistry.
If your incoming data having header then we can treat first line as header property to true and Schema Access Strategy Use String Fields From Header(nifi will take string type for all the header fields) by using these two properties we can generate schema dynamically.

Record Writer:

I’m using AvroRecordSetWriter to convert CSV format data into Avro then use ConvertAvroToORC processor to store orc files into HDFS.

Configure the controller service as shown below

74388-avrosetwriter.png

Schema write strategy property value as Embed Avro Schema //we are embedded Avro Schema into the content of the flowfile.

AvroSchemaRegistry:

  • In this controller we are defining our shared avro schema used by both reader/writer controllers in this flow.
  • Validate Field Names value as true //if you are having – in the field names then keep this property value to false then controller service won’t validate field names.

Add new property shown below screenshot

74389-avroschemaregistry.png

Add new property

sch

{
 "namespace": "nifi",
 "name": "person",
 "type": "record",
 "fields": [
  { "name": "id", "type": ["null","int"] },
  { "name": "first_name", "type": ["null","string"]},
  { "name": "last_name", "type": ["null","string"]},
  { "name": "email", "type": ["null","string"]},
  { "name": "gender", "type": ["null","string"]},
  { "name": "joined_date", "type": ["null","string"]},
  { "name": "partition_dt", "type": ["null","string"]}
 ]
}
  • In this avro schema I’m allowing null values for all the fields, if you don’t want to allow null values for specific fields then keep the type as just string/int without default values.
  • Then the processor will route the flowfile to failure relationship if the field value having null’s. (or) We can use validate record processor to validate contents of flowfile against given schema.
  • Enable AvroSchemaRegistry and all the reference controller services

Add new property in update record processor shown below screenshot74390-updaterecord.png

/partition_dt

format(toDate( /joined_date, "yyyy-MM-dd HH:mm:ss"),"yyyy-MM-dd")

We are adding partition_dt column to the content of flowfile based on joined_date field value in the format of yyyy-MM-dd.

Please refer to this link for more details regarding RecordPathDomainSpecific Language.

4.PartitionRecord Configs:

Record Reader

Configure/enable AvroReader controller service as shown below

74391-avroreader.png

We are using Schema Access Strategy property value as Use Embedded Avro Schema //as the feeding avro file will have schema embedded in it.

Record Writer

Configure/enable the AvroSetWriter controller service as shown below

74392-avrosetwriter.png

Add new dynamic property to Partition Record processor

74393-partitionrecord.png

partition_dt

/partition_dt
  1. Now the processor reads the incoming data and partitions the records based on the partition_dt value.
  2. Based on how many number of unique partition values we are having in the flowfile content those many output flowfiles are generated.
  3. Each flowfile will have partition_dt attribute with the value describes which partition this flowfile belongs to.

Output Flowfile attributes:

74397-partitionrecord-output.png

The processor added partition_dt attribute with value as 2018-05-10 and record.count 357(number of records in this flowfile)

5.ConvertAvroToORC Configs:

Keep the processor configs to default unless if you need to add ORC configuration resources property value.

6.UpdateAttribute Configs:

74394-updateattribute-filename.png

In this processor we are changing the filename to unique value.

7.PutHDFS Configs:

Configure/enable the processor based on your NiFi instance setup(Kerberos principal/keytab..etc)

74395-puthdfs.png

Directory

/user/shu/employee/partition_dt=${partition_dt}
  1. In this expression hive table is created at /user/shu/employee location and partition_dt partition column name.
  2. Based on partition_dt attribute value we are creating new directories (or) store files into existing directories dynamically.

Check for newly created sub-directories in employee directory

[bash$] hdfs dfs -ls /user/shu/employee/
Found 3 items
drwxr-xr-x   - nifi hdfs          0 2018-05-13 10:16 /user/shu/employee/partition_dt=2018-05-09
drwxr-xr-x   - nifi hdfs          0 2018-05-13 10:16 /user/shu/employee/partition_dt=2018-05-10
drwxr-xr-x   - nifi hdfs          0 2018-05-13 10:16 /user/shu/employee/partition_dt=2018-05-11

Check newly created files in each sub-directories of employee directory

[bash$] hdfs dfs -ls -h -r /user/shu/employee/*
Found 1 items
-rw-r--r--   3 nifi hdfs     18.7 K 2018-05-13 10:16 /user/shu/employee/partition_dt=2018-05-09/da5c645a-36d4-415e-9920-467314b31900.orc
Found 1 items
-rw-r--r--   3 nifi hdfs     21.3 K 2018-05-13 10:16 /user/shu/employee/partition_dt=2018-05-10/121db330-7a61-4b8e-a1ee-5cb1692c6c22.orc
Found 1 items
-rw-r--r--   3 nifi hdfs     19.8 K 2018-05-13 10:14 /user/shu/employee/partition_dt=2018-05-11/b3e15fec-b78c-4f0b-ab0b-3fa89d057a9b.orc

8.ReplaceText Configs:

Configure the processor as shown below.

74396-replacetext.png

  1. In this processor we are having create table if not exist statement as we need to create partitioned table so we are not using hive.ddl attribute that has been added from ConvertAvroToORC processor.
  2. Each time when we add new file into HDFS directory hive doesn’t know about newly added files, we need to run msck repair table (or) alter table add partition(<partition-name>) location <location name> statements to update the newly added information in Hive metastore.

9.PutHiveQL Configs:

Configure/enable Hive connection pool

Based on the contents of flowfile this processor executes those statements when we are having more than one statement as content of flowfile then processor separates each statement based on Statement Delimiter value.

Rollback On Failure

True //failed flowfiles will be stayed in incoming connection without penalizing and try to reprocess repeatedly until processed successfully.It is important to set adequate 'Yield Duration' to avoid retrying too frequently.

False(default) //based on the error type flowfiles will be routed to either failure (or) retry relationship and processor can continue with next flowfile.

  • Based on number of records you are getting in each flowfile we need to use SplitRecord processor to split millions of records into smaller chunks to avoid out of memory issues.
  • If we are ending up small size files in partitions then we need to run compaction on the small files, copy the specific partition where there are lot of small files to temp table, make sure while compacting specific partition we are not writing any data to be written to this partition/directory in HDFS and overwrite the same partition with less number of files.

Check Hive table about partitions and data

hive> show partitions default.employee;
+--------------------------+--+
|        partition         |
+--------------------------+--+
| partition_dt=2018-05-09  |
| partition_dt=2018-05-10  |
| partition_dt=2018-05-11  |
+--------------------------+--+
3 rows selected (0.488 seconds)
hive> select * from default.employee limit 10;
+-----+-------------+--------------+-------------------------------+---------+----------------------+---------------+--+
| id  | first_name  |  last_name   |             email             | gender  |     joined_date      | partition_dt  |
+-----+-------------+--------------+-------------------------------+---------+----------------------+---------------+--+
| 1   | Cordy       | Dupxx        | cdupey0@example.com          | Male    | 2018-05-09 12:56:15  | 2018-05-09    |
| 2   | Trumann     | Hindxxxx     | thindmoor1@example.com       | Male    | 2018-05-09 06:45:17  | 2018-05-09    |
| 7   | Stu         | Manxxm       | smanus6@example.net          | Male    | 2018-05-09 08:54:13  | 2018-05-09    |
| 8   | Petey       | Petxxx       | ppetofi7@example.edu         | Male    | 2018-05-09 02:04:42  | 2018-05-09    |
| 11  | Marilyn     | Brodxxx      | mbrodeaua@example.com        | Female  | 2018-05-09 21:47:17  | 2018-05-09    |
| 14  | Gibby       | Branxxxx     | gbrandinod@example.com       | Male    | 2018-05-09 10:52:04  | 2018-05-09    |
| 16  | Friederike  | Schwxxxx     | fschwartzf@example.gov       | Female  | 2018-05-09 02:42:17  | 2018-05-09    |
| 19  | Patrizius   | Hallexxxxx   | phalleybonei@example.net     | Male    | 2018-05-09 19:16:25  | 2018-05-09    |
| 20  | Chic        | Ecclesxxxx   | ceccleshallj@example.com     | Male    | 2018-05-09 16:39:13  | 2018-05-09    |
| 25  | Annabelle   | Dwerryhxxxx  | adwerryhouseo@example.com.   | Female  | 2018-05-09 16:20:31  | 2018-05-09    |
+-----+-------------+--------------+-------------------------------+---------+----------------------+---------------+--+

Now we have converted CSV data to ORC format, dynamically partitioned the records, stored to HDFS in NiFi and read data from Hive table.

All the failure relationships are feeded to funnel then to PutEmail processor to get notified to the configured recipients for each incoming failure flowfile.

Please refer to this and this links for more details regarding UpdateRecord Processor.

Please refer this link for NiFi Expression Language.

Reference flow.xml

create-dynamic-partitions-in-nifi.xml

16,035 Views
Comments

Followed your steps but getting error on ConvertRecord processor /partition_dt is invalid because 'partition_dt' is not an associated property or has no validator associated with it. How can I resolve?