Support Questions

Find answers, ask questions, and share your expertise

Replacing Flume workflow with NiFi help

avatar
Contributor

Hi there,

Currently I am trying to replace our current flume workflow using NiFi.

For a bit of background I am working with JSON files where each line is an event (bids, logins, purchase, comment etc..) with other information that comes along with that.

What I am trying to do is:

1. Read the JSON file line by line

2. Grab the event type from each line

3. Store events of the same type in say 1GB files in HDFS based on the event type e.g (events/$(event_type)). I don't want to store lots of tiny files preferably.

I have attached some of the different ways I have tried, but haven't been able to get the desired flow.

Thanks a lot!

Brendan

38480-merge.png

38479-routing.png

1 ACCEPTED SOLUTION

avatar
Master Guru

You may want to take a look at the record related processors. There is a processor called PartitionRecord that works kind of like RouteText, but a lot nicer, and you could send in a big set of JSON documents in, and have it partition the records based on a field in each record (i.e. eventType). From there you can use MergeContent with a Correlation Attribute of eventType in order to merge together the appropriate number of events for each type, and then PutHDFS with a Directory of something like /data/${eventType}, this way you need only one MergeContent and one PutHDFS to handle all of the event types.

The unfortunate thing is that PartitionRecord may not work for your data since technically JSON documents separated by new lines is not a valid JSON document in itself, but it might be worth a try... If you have control of the upstream data you could always make it a valid array of JSON documents.

If you can't do any of that then I think RouteText is the best option... After RouteText you could have each route go to an UpdateAttribute processor that adds an attribute called eventType with the corresponding type, then send them all to a MergeContent and PutHDFS using the approach I described above with Correlation Attribute and the dynamic HDFS directory.

View solution in original post

3 REPLIES 3

avatar
Master Guru

You may want to take a look at the record related processors. There is a processor called PartitionRecord that works kind of like RouteText, but a lot nicer, and you could send in a big set of JSON documents in, and have it partition the records based on a field in each record (i.e. eventType). From there you can use MergeContent with a Correlation Attribute of eventType in order to merge together the appropriate number of events for each type, and then PutHDFS with a Directory of something like /data/${eventType}, this way you need only one MergeContent and one PutHDFS to handle all of the event types.

The unfortunate thing is that PartitionRecord may not work for your data since technically JSON documents separated by new lines is not a valid JSON document in itself, but it might be worth a try... If you have control of the upstream data you could always make it a valid array of JSON documents.

If you can't do any of that then I think RouteText is the best option... After RouteText you could have each route go to an UpdateAttribute processor that adds an attribute called eventType with the corresponding type, then send them all to a MergeContent and PutHDFS using the approach I described above with Correlation Attribute and the dynamic HDFS directory.

avatar
Contributor

Hi there @Bryan Bende, Thanks a lot for your answer.

I think I am going to go with the RouteText option for now.

A couple questions with that,

  • Will the RouteText go to a singular UpdateAttributes processor or a separate one for each event type? I did try just one UpdateAttribute processor with event_type set as ${RouteText.Route}
  • How should be MergeContent be set up. I have tried as below but it is joining all the different event types into one file. Where I want a separate file for each event type that goes into HDFS when it is say 1GB large.

38503-merge-v2.png

avatar
Contributor

I found my problem, using ${event_type} in the Correlation Attribute Name where it should just be event_type.

All is sorted now thanks a lot for the help!