Support Questions

Find answers, ask questions, and share your expertise

NiFi - Creating the output directory from the content of the flow file

avatar
Rising Star

Hi,

Am ingesting the data from a MySQL DB using the executeSQL-->ConvertAvroToJSON-->PublishKafka_0_10 processor. The result will have selected columns from the Table and the extract_date. The output will look {"col1": val1, "col2": "val2", "col3": "val3", "col4": val4,"extract_date": "2017-12-21 00:17:10.0"} and the same will be stored into a Kafka topic.

After which i have another workflow to consume from the kafka topic and write it into a HDFS folder. [ConsumeKafka_0_10 --> PutHDFS]

My Requirement is, while consuming the messages from the kafka topic, use hour value from the extract_date field and push the messages to the corresponding hour folder in the HDFS.

For Example : If the field "extract_date": is having "2017-12-21 00:17:10.0" as the value. This message should be written into HDFS under /folder/year=2017/month=12/day=21/hour=00/{filename}

Is there a way to achieve this use case.

Thanks,

Bala

1 ACCEPTED SOLUTION

avatar
Master Guru

@Bala S

You need to extract the extract_date value from your content and add that value as attribute associated with the flowfile(EX:- if you are having json message then use evaluate json path processor(or) if content is csv then use extract text processor and extract the date value and keep that value to the flowfile).

Once you are done with extracting the value and the attribute is associated with the flowfile.

Then follow the below steps

For testing i am having extract_date attribute with value 2017-12-21 00:17:10.0 associate with the flow file.

Then Use Update Attribute processor with below configs:-

Add new properties for the update attribute processor as

day

${extract_date:toDate("yyyy-MM-dd HH:mm:ss"):format("MM")}

hour

${extract_date:toDate("yyyy-MM-dd HH:mm:ss"):format("HH")}

month

${extract_date:toDate("yyyy-MM-dd HH:mm:ss"):format("MM")}

year

${extract_date:toDate("yyyy-MM-dd HH:mm:ss"):format("yyyy")}

45529-update.png

By using update attribute with the following properties we are dynamically generating year,month,day,hour attributes based on extract_date attribute.

Then use Put HDFS processor

with directory property as

/folder/year=${year}/month=${month}/day=${day}/hour=${hour}/${filename}

We are going to add year,moth,day,hour attributes to the flowfile using update attribute processor as mentioned above, then using those attributes to in directory property.

Now the put hdfs processor will create the directories in HDFS if the directory does not exists.

If the Answer helped to resolve your issue, Click on Accept button below to accept the answer, That would be great help to Community users to find solution quickly for these kind of errors.


update.png

View solution in original post

6 REPLIES 6

avatar
Master Guru

@Bala S

You need to extract the extract_date value from your content and add that value as attribute associated with the flowfile(EX:- if you are having json message then use evaluate json path processor(or) if content is csv then use extract text processor and extract the date value and keep that value to the flowfile).

Once you are done with extracting the value and the attribute is associated with the flowfile.

Then follow the below steps

For testing i am having extract_date attribute with value 2017-12-21 00:17:10.0 associate with the flow file.

Then Use Update Attribute processor with below configs:-

Add new properties for the update attribute processor as

day

${extract_date:toDate("yyyy-MM-dd HH:mm:ss"):format("MM")}

hour

${extract_date:toDate("yyyy-MM-dd HH:mm:ss"):format("HH")}

month

${extract_date:toDate("yyyy-MM-dd HH:mm:ss"):format("MM")}

year

${extract_date:toDate("yyyy-MM-dd HH:mm:ss"):format("yyyy")}

45529-update.png

By using update attribute with the following properties we are dynamically generating year,month,day,hour attributes based on extract_date attribute.

Then use Put HDFS processor

with directory property as

/folder/year=${year}/month=${month}/day=${day}/hour=${hour}/${filename}

We are going to add year,moth,day,hour attributes to the flowfile using update attribute processor as mentioned above, then using those attributes to in directory property.

Now the put hdfs processor will create the directories in HDFS if the directory does not exists.

If the Answer helped to resolve your issue, Click on Accept button below to accept the answer, That would be great help to Community users to find solution quickly for these kind of errors.


update.png

avatar
Rising Star

@Shu,

Thanks for your quick reply.

After i configure the above mentioned processor[updateAttribute] same as directed,

Am ending up with a directory structure mentioned below,

/folder/year=/month=/day=/hour=/677944128880138. The year, month, day and hour folders do not have the proper value populated.....

It should be created as /folder/year=2017/month=12/day=22/hour=19/{filename}

Note : The current run have the"extract_date": as "2017-12-22 19:16:17.0"

Update Processor Configuration:

45534-updateattribute-processor.png


updateattribute-processor.png

avatar
Master Guru
@Bala S

I think you don't have extract_date attribute associated with the flowfile.

Attribute screenshot:-

45535-attribute.png

Make sure your each file that are processing from Update Attribute processor are having extract_date attribute associate with them.To check attribute

  1. Right Click on queue
  2. click on list queue
  3. select any of the flowfile
  4. click on Attributes tab then you need to have extract_date attribute with the value.

If you don't have any value then the directory name would be empty(same issue you are facing now).

How to add attribute to the flowfile?
You need to extract the extract_date value from your content and add that value as attribute associated with the flowfile

Example:- if you are having json message then use evaluate json path processor(or) if content is csv then use extract text processor and extract the date value and keep that value to the flowfile.

Once you are done with extracting the value and the attribute is associated with the flowfile.

avatar
Rising Star

@Shu,

My Bad. I Missed to configure EvaluateJSONPath.

Here is my EvaluateJSONPath Processor Config,

45540-evaluatejsonpath.png

workflow:

45539-workflow.png

However while configuring EvaluateJSONPath Processor, Am ending up getting the error message as given below,

45541-error.png

Hope my workflow sequencing is correct. Please correct me if am wrong.


evaluatejsonpath.png

avatar
Rising Star

HI,

Now its working fine. The issue is with the configuartion in EvaluateJSONPath processor.

I missed a "." for the attribute to fetch. I have accepted the answer. Thanks a lot for your help!!!

It should be "$.extract_date" instead of "$extract_date"		

avatar
Contributor

Hi @Shu

I am trying to create the directory with current time stamp in hdfs and as above I tried but did not work since it doesnt have

extract_date 

what should I use to get the desire output ?

Right now my flow is : ListHDFS -----> Updateattribute ----->PutHDFS.


any suggestions please ?