Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Kafka-->Nifi--parquet--->HDFS

avatar
Contributor

My process looks like this
ConsumerKafka 2.6 --- JoltTransformJSON---- ConvertRecord ----MergeRecord---PutHDFS
my task is i get data from kafka in jsion format i need to write large files to ndfs in parquet format, but i get small files of 10kb each i would like to receive files of at least 100mb (changed settings in merge always sends small files) and i get flow files, but you need parquet what's wrong.
1- The file must be large
2- file must be parquet

3 ACCEPTED SOLUTIONS

avatar

hi @VLban,

How do your files look like before reaching MergeRecord and how do they look after they went through MergeRecord? Besides that, what settings did you use in your MergeRecord?


For your two requirements, everything depends on how you configure MergeRecord.

  • To generate parquet files, you set the Parquet Writer in your Record Writer property.
  • For Large Files, you must define the Minimum Bin size, the Minimum Number of Records and optional Max Bin Age. What it would also help is the Correlation Attribute Name.

 

View solution in original post

avatar

First of all, you need to identify how the data comes from your kafka processor.Once you identified how the data comes from your kafka brokers, you can define the Record Reader in your MergeRecord Processor --> based on your original description I assume JSON, so you need a JSONTreeReader. Knowing that you want your data to be delivered as parquet, within your Record Writer, you must define a ParquestRecordSetWriter Controller Service, which will transform your data (which is read with the Record Reader) into a Parquet File (which is written with the Record Writer).

View solution in original post

avatar

Add a UpdateAttribute in front of PutHDFS and use NEL to rename your file from ${filename} to ${filename}.parquet and then save it into HDFS wherever you want.

View solution in original post

15 REPLIES 15

avatar
Contributor

this method does not work, puthdfs creates a directory with the extension directory1.parquet and inside the directory there is still a file without extension, I tried like this /user/test/${now():toNumber()}.parquet, but I need to have files in the same test directory and not parquet directories

avatar

Add a UpdateAttribute in front of PutHDFS and use NEL to rename your file from ${filename} to ${filename}.parquet and then save it into HDFS wherever you want.

avatar
Contributor

yes, it helped me, it's a pity that there is no built-in functionality for puthdfs

avatar
Contributor

is it possible to make three processes per kafka and so that the output data is not repeated?

avatar
Contributor

puthdfs process can file give transform parquet to HDFS or do i need putparquet to HDFS?

avatar
Contributor

another interesting point is how to implement on updateattribute the function of checking whether it was delivered to puthdfs because if hdfs runs out locally, the process continues to go and the files are not written but are thrown out of the queue and go to another file in the basket, in fact, files are lost if the meso runs out and the chain does not stop , you need to check if the file in ndfs did not arrive stop the stream or the memory ran out in ndfs stop putndfs and let the recycle bin fill up