Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Nifi Merge Content Processor with Defragment errors out

avatar
Explorer

flow1.pngHello,

I am new to Nifi and need some help with Merging the flow files.

Attaching my flow, flow1.png and merge content settings and error mergeprocesorsetting.png.

error.png

I am trying to publish csv records to Kafka using Publish Kafka record Processor. The file may contain thousand to million records.

Before sending to Kafka I am splitting my file and replacing some text.So I am trying to send individual record to Kafka.

After successfully publishing them, I want to put all the individual records in a single header csv file.

I tried Merge Processor with Defragment strategy, It works fine for 10000 records, but when my file is bigger

It gives error, can not defragment. I tried to set Maximum number of entry to no value but it takes 1000 as default. not sure, how to set it blank.

I also tried Bin packing strategy, but it creates many files and merges with many header for each record.

Can anyone guide me how to merge, flowfiles to single file.

Please help me fix this error.

8 REPLIES 8

avatar
Master Guru

@Nisha Patel

Take off Max Bin Age property value, as you are configured as 10sec so merge content processor will wait 10 sec and merges all the flowfiles but for defragement strategy processor needs 38225 files and found only 10k files, So this is routing to failure.

In Addition use Record oriented processor(update record..etc) and then use PublishKafkaRecord processor, record oriented processors are intended to work on batch of records you don't have to use multiple split processors at all.

Flow:

1.ListFile
2.FetchFile
3.UpdateRecord //you can use this processor instead of Replace Text processor
4.PublishKafkaRecord
--other processing

avatar
Explorer

@Shu,

Thank you for replying.

I will try your suggestion and let you know.

My intention for splitting records was to capture individual records, that failed to publish to kafka.

Right now, if any record fails, entire file fails, and I don't know how to capture only failed one and continue sending other records.

Can you please suggest me better approach?

Thank you.

avatar
Explorer

@Shu,

I tried your suggestion but my output file has header on each record.

Can you please guide me, how to have a single header for the merged file?

I appreciate your time.

Thank you.

avatar
Master Guru

@Nisha Patel

Could you please add your flow screenshot to understand how your flow is designed? and configurations of MergeContent processor.

avatar
Explorer

@Shu,

Thank you so much for responding. Attaching flow and Merge content configuration. By the way merge content create more than one file and adds header on each record. I am not sure, what is the best setting for merging.

flow1.pngflow2.pngflow3.png

Thank you.


avatar
Master Guru
@Nisha Patel

If you look into configs of PublishKafkaRecord processor there is Record Reader/Writer controller services, so if your Record Writer is CsvSetWriter then you have configured Include Heder Line property value as true. i.e on each record you are writing the header so when we use Merge Content processor you are going to have header lines included for each record.

.

To resolve this issue Change the Include Heder Line value to False(now we are not writing header to each record) and then in Merge Content processor keep the Header property value as your header.

So by following this way after merging completes then processor adds Header to the file.

.

Is there a specific reason why are you using PublishKafkaRecord processor?

You can even use PublishKafka processor(because as you are splitting each record so there is no need to use Record oriented processors in this case unless if you have some valid reason.) which doesn't require any Record reader/writer controller services, so the message that we published into Kafka topic will be routed to Success relationship.

Then use Merge Content processor to Merge all these flowfiles into one and then add the Header to the merged file.

Flow:

Replace PublishKafkaRecord processor with PublishKafka processor
MergeContent processor

avatar
Explorer

@Shu,

Thank you for taking time to reply.

I am using PublishKafkaRecord processor with CSVReader and AvroRecordSetWriter services.

for two reasons, I have to convert my csv reocrds to avro, 2) to check if Schema of incoming message matches to my avro schema.I don't want to send any incoming messages to kafka, but only the once which are successfully converted to my schema.

Please let me know, if I can provide any other info.

Thank you.

avatar
Explorer

@Shu,

can you please help me resolve it?

I greately appreciate your time.

Thank you.