Created 12-22-2017 06:52 PM
Hi,
Am ingesting the data from a MySQL DB using the executeSQL-->ConvertAvroToJSON-->PublishKafka_0_10 processor. The result will have selected columns from the Table and the extract_date. The output will look {"col1": val1, "col2": "val2", "col3": "val3", "col4": val4,"extract_date": "2017-12-21 00:17:10.0"} and the same will be stored into a Kafka topic.
After which i have another workflow to consume from the kafka topic and write it into a HDFS folder. [ConsumeKafka_0_10 --> PutHDFS]
My Requirement is, while consuming the messages from the kafka topic, use hour value from the extract_date field and push the messages to the corresponding hour folder in the HDFS.
For Example : If the field "extract_date": is having "2017-12-21 00:17:10.0" as the value. This message should be written into HDFS under /folder/year=2017/month=12/day=21/hour=00/{filename}
Is there a way to achieve this use case.
Thanks,
Bala
Created on 12-22-2017 07:09 PM - edited 08-17-2019 05:39 PM
You need to extract the extract_date value from your content and add that value as attribute associated with the flowfile(EX:- if you are having json message then use evaluate json path processor(or) if content is csv then use extract text processor and extract the date value and keep that value to the flowfile).
Once you are done with extracting the value and the attribute is associated with the flowfile.
Then follow the below steps
For testing i am having extract_date attribute with value 2017-12-21 00:17:10.0 associate with the flow file.
Then Use Update Attribute processor with below configs:-
Add new properties for the update attribute processor as
day
${extract_date:toDate("yyyy-MM-dd HH:mm:ss"):format("MM")}
hour
${extract_date:toDate("yyyy-MM-dd HH:mm:ss"):format("HH")}
month
${extract_date:toDate("yyyy-MM-dd HH:mm:ss"):format("MM")}
year
${extract_date:toDate("yyyy-MM-dd HH:mm:ss"):format("yyyy")}
By using update attribute with the following properties we are dynamically generating year,month,day,hour attributes based on extract_date attribute.
Then use Put HDFS processor
with directory property as
/folder/year=${year}/month=${month}/day=${day}/hour=${hour}/${filename}
We are going to add year,moth,day,hour attributes to the flowfile using update attribute processor as mentioned above, then using those attributes to in directory property.
Now the put hdfs processor will create the directories in HDFS if the directory does not exists.
If the Answer helped to resolve your issue, Click on Accept button below to accept the answer, That would be great help to Community users to find solution quickly for these kind of errors.
Created on 12-22-2017 07:09 PM - edited 08-17-2019 05:39 PM
You need to extract the extract_date value from your content and add that value as attribute associated with the flowfile(EX:- if you are having json message then use evaluate json path processor(or) if content is csv then use extract text processor and extract the date value and keep that value to the flowfile).
Once you are done with extracting the value and the attribute is associated with the flowfile.
Then follow the below steps
For testing i am having extract_date attribute with value 2017-12-21 00:17:10.0 associate with the flow file.
Then Use Update Attribute processor with below configs:-
Add new properties for the update attribute processor as
day
${extract_date:toDate("yyyy-MM-dd HH:mm:ss"):format("MM")}
hour
${extract_date:toDate("yyyy-MM-dd HH:mm:ss"):format("HH")}
month
${extract_date:toDate("yyyy-MM-dd HH:mm:ss"):format("MM")}
year
${extract_date:toDate("yyyy-MM-dd HH:mm:ss"):format("yyyy")}
By using update attribute with the following properties we are dynamically generating year,month,day,hour attributes based on extract_date attribute.
Then use Put HDFS processor
with directory property as
/folder/year=${year}/month=${month}/day=${day}/hour=${hour}/${filename}
We are going to add year,moth,day,hour attributes to the flowfile using update attribute processor as mentioned above, then using those attributes to in directory property.
Now the put hdfs processor will create the directories in HDFS if the directory does not exists.
If the Answer helped to resolve your issue, Click on Accept button below to accept the answer, That would be great help to Community users to find solution quickly for these kind of errors.
Created on 12-22-2017 07:33 PM - edited 08-17-2019 05:39 PM
@Shu,
Thanks for your quick reply.
After i configure the above mentioned processor[updateAttribute] same as directed,
Am ending up with a directory structure mentioned below,
/folder/year=/month=/day=/hour=/677944128880138. The year, month, day and hour folders do not have the proper value populated.....
It should be created as /folder/year=2017/month=12/day=22/hour=19/{filename}
Note : The current run have the"extract_date": as "2017-12-22 19:16:17.0"
Update Processor Configuration:
Created on 12-22-2017 07:44 PM - edited 08-17-2019 05:39 PM
I think you don't have extract_date attribute associated with the flowfile.
Attribute screenshot:-
Make sure your each file that are processing from Update Attribute processor are having extract_date attribute associate with them.To check attribute
If you don't have any value then the directory name would be empty(same issue you are facing now).
How to add attribute to the flowfile?
You need to extract the extract_date value from your content and add that value as attribute associated with the flowfile
Example:- if you are having json message then use evaluate json path processor(or) if content is csv then use extract text processor and extract the date value and keep that value to the flowfile.
Once you are done with extracting the value and the attribute is associated with the flowfile.
Created on 12-22-2017 08:47 PM - edited 08-17-2019 05:39 PM
@Shu,
My Bad. I Missed to configure EvaluateJSONPath.
Here is my EvaluateJSONPath Processor Config,
workflow:
However while configuring EvaluateJSONPath Processor, Am ending up getting the error message as given below,
Hope my workflow sequencing is correct. Please correct me if am wrong.
Created 12-22-2017 09:31 PM
HI,
Now its working fine. The issue is with the configuartion in EvaluateJSONPath processor.
I missed a "." for the attribute to fetch. I have accepted the answer. Thanks a lot for your help!!!
It should be "$.extract_date" instead of "$extract_date"
Created 06-12-2019 08:02 PM
Hi @Shu
I am trying to create the directory with current time stamp in hdfs and as above I tried but did not work since it doesnt have
extract_date
what should I use to get the desire output ?
Right now my flow is : ListHDFS -----> Updateattribute ----->PutHDFS.
any suggestions please ?