Created on 08-31-2017 09:50 PM - edited 08-17-2019 05:06 PM
Hi there,
Currently I am trying to replace our current flume workflow using NiFi.
For a bit of background I am working with JSON files where each line is an event (bids, logins, purchase, comment etc..) with other information that comes along with that.
What I am trying to do is:
1. Read the JSON file line by line
2. Grab the event type from each line
3. Store events of the same type in say 1GB files in HDFS based on the event type e.g (events/$(event_type)). I don't want to store lots of tiny files preferably.
I have attached some of the different ways I have tried, but haven't been able to get the desired flow.
Thanks a lot!
Brendan
Created 09-01-2017 01:47 PM
You may want to take a look at the record related processors. There is a processor called PartitionRecord that works kind of like RouteText, but a lot nicer, and you could send in a big set of JSON documents in, and have it partition the records based on a field in each record (i.e. eventType). From there you can use MergeContent with a Correlation Attribute of eventType in order to merge together the appropriate number of events for each type, and then PutHDFS with a Directory of something like /data/${eventType}, this way you need only one MergeContent and one PutHDFS to handle all of the event types.
The unfortunate thing is that PartitionRecord may not work for your data since technically JSON documents separated by new lines is not a valid JSON document in itself, but it might be worth a try... If you have control of the upstream data you could always make it a valid array of JSON documents.
If you can't do any of that then I think RouteText is the best option... After RouteText you could have each route go to an UpdateAttribute processor that adds an attribute called eventType with the corresponding type, then send them all to a MergeContent and PutHDFS using the approach I described above with Correlation Attribute and the dynamic HDFS directory.
Created 09-01-2017 01:47 PM
You may want to take a look at the record related processors. There is a processor called PartitionRecord that works kind of like RouteText, but a lot nicer, and you could send in a big set of JSON documents in, and have it partition the records based on a field in each record (i.e. eventType). From there you can use MergeContent with a Correlation Attribute of eventType in order to merge together the appropriate number of events for each type, and then PutHDFS with a Directory of something like /data/${eventType}, this way you need only one MergeContent and one PutHDFS to handle all of the event types.
The unfortunate thing is that PartitionRecord may not work for your data since technically JSON documents separated by new lines is not a valid JSON document in itself, but it might be worth a try... If you have control of the upstream data you could always make it a valid array of JSON documents.
If you can't do any of that then I think RouteText is the best option... After RouteText you could have each route go to an UpdateAttribute processor that adds an attribute called eventType with the corresponding type, then send them all to a MergeContent and PutHDFS using the approach I described above with Correlation Attribute and the dynamic HDFS directory.
Created on 09-04-2017 12:28 AM - edited 08-17-2019 05:06 PM
Hi there @Bryan Bende, Thanks a lot for your answer.
I think I am going to go with the RouteText option for now.
A couple questions with that,
Created 09-05-2017 04:36 AM
I found my problem, using ${event_type} in the Correlation Attribute Name where it should just be event_type.
All is sorted now thanks a lot for the help!