Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Consuming Kafka, each Json Messages and write to HDFS as one file?

avatar

mergecontent.png

I'm new to NIFI. I'm trying to consumeKafka json message and write to hdfs in one file, also the filename should be date of the day.

I've tried to use mergecontent and write to hdfs, but it created multiple files.

This is my mergecontent looks like:

1 ACCEPTED SOLUTION

avatar
Super Mentor

@Naresh Kumar Korvi

You will want to stick with the "Bin-Packing Algorithm" merge strategy in your case. The reason you are ending up with single files is because of the way the MergeContent processor is designed to work. There are several factors in play here:

12166-screen-shot-2017-02-06-at-81825-am.png

As the MergeContent processor will start the content of each new FlowFile on a new line. However, at times the incoming content of each FlowFile may be multiple lines itself. So it may be desirable to put a user defined "Demarcator" between the content of each FlowFile should you need to differentiate the content of each merge at a later time. If that is the case, the MergeContent processor provides a "Demarcator" property to accomplish this.

An UpdateAttribute processor can be used following the MergeContent processor to set a new "filename" on the resulting merged FlowFile. I am not sure the exact filename format you want to use, but here is an example config that produce a filename like "2017-02-06":

12167-screen-shot-2017-02-06-at-84448-am.png

Thanks,

Matt

View solution in original post

8 REPLIES 8

avatar
Super Guru
@Naresh Kumar Korvi

Can you update your minimum number of entries to what you want at minimum, let's say 500 files. Also since it's all similar data going into one file, I am assuming flow file attributes are same. Can you change your merge strategy to defragment?

Finally I am your flow is something like this:

consumeKafka -> mergecontent -> putHDFS

Is that right?

avatar
Super Mentor

The "defragment" merge strategy can only be used to Merge files that have very specific attributes assigned to them. That strategy is typically used to reassemble a FlowFile that was previously split apart by NiFi.

avatar

@Naresh Kumar Korvi You want it to look a bit like this:

  • Note the header, footer, and demarcator - this will aggregate your json records into a properly formatted document for later reading
  • Set a max bin age so the final few messages will not get stuck in the queue
  • Set a min size and min number of entires to stop lots of little files being written
  • Set a max size and max entries that generate a file of the size you want to work with
  • Play with the values a bit using the GenerateFlowfile processor to create appropriately sized content to test with if your Kafka dataflow is a bit slow.
  • Your flow should be ConsumeKafka -> MergeContent -> UpdateAttribute (set filename, path) -> PutHDFS

12165-screen-shot-2017-02-06-at-100615.png

avatar

@Dan Chaffey

Yes, my NIFI data flow has the same design.

Thanks Dan, i've the changes recommended.

avatar
Super Mentor

@Naresh Kumar Korvi

You will want to stick with the "Bin-Packing Algorithm" merge strategy in your case. The reason you are ending up with single files is because of the way the MergeContent processor is designed to work. There are several factors in play here:

12166-screen-shot-2017-02-06-at-81825-am.png

As the MergeContent processor will start the content of each new FlowFile on a new line. However, at times the incoming content of each FlowFile may be multiple lines itself. So it may be desirable to put a user defined "Demarcator" between the content of each FlowFile should you need to differentiate the content of each merge at a later time. If that is the case, the MergeContent processor provides a "Demarcator" property to accomplish this.

An UpdateAttribute processor can be used following the MergeContent processor to set a new "filename" on the resulting merged FlowFile. I am not sure the exact filename format you want to use, but here is an example config that produce a filename like "2017-02-06":

12167-screen-shot-2017-02-06-at-84448-am.png

Thanks,

Matt

avatar

@Matt

Thanks Matt, this is some kind of similar i'm looking at. but also how do i create dir based on date condition.

For Example: Based on date range it should create a dir dynamically.

this is what i'm expecting the dir structure to be:

period1-year/p1-week1/date/date.json

I'm not sure if i have the right condition in updateattribute.

updateattrib-on-rules.png

avatar
Super Mentor

@Naresh Kumar Korvi

The "Conditions" specified for your rule must result in a boolean "true" before the associated "Actions" will be applied against the incoming FlowFile. Your condition you have in the screenshot will always resolve to true...

12196-screen-shot-2017-02-07-at-91722-am.png

Looking at your "dirname" attribute, it is not going to return your desired directory path of:

period1-year/p1-week1/date

and your "filename" attribute will be missing the .json extension you are looking for as well:

date.json

I believe what you are trying to do is better accomplished using the below "Condition" and "Action" configurations:

12197-screen-shot-2017-02-07-at-92138-am.png

Condition:

${now():format('MM'):le(2):and(${now():format('dd'):le(25)})}

dirname:

period1-${now():format('yyyy')}/p1-${now():format('ww')}/${now():format('MM-dd-yyyy')}

filename:

${now():format('MM-dd-yyyy')}.json

12198-screen-shot-2017-02-07-at-92245-am.png

Thanks,

Matt

avatar

@Matt

Thanks, Matt this is something i was looking at.