Created 04-19-2023 03:52 AM
My process looks like this
ConsumerKafka 2.6 --- JoltTransformJSON---- ConvertRecord ----MergeRecord---PutHDFS
my task is i get data from kafka in jsion format i need to write large files to ndfs in parquet format, but i get small files of 10kb each i would like to receive files of at least 100mb (changed settings in merge always sends small files) and i get flow files, but you need parquet what's wrong.
1- The file must be large
2- file must be parquet
Created 04-19-2023 06:58 AM
hi @VLban,
How do your files look like before reaching MergeRecord and how do they look after they went through MergeRecord? Besides that, what settings did you use in your MergeRecord?
For your two requirements, everything depends on how you configure MergeRecord.
Created 04-26-2023 12:55 AM
First of all, you need to identify how the data comes from your kafka processor.Once you identified how the data comes from your kafka brokers, you can define the Record Reader in your MergeRecord Processor --> based on your original description I assume JSON, so you need a JSONTreeReader. Knowing that you want your data to be delivered as parquet, within your Record Writer, you must define a ParquestRecordSetWriter Controller Service, which will transform your data (which is read with the Record Reader) into a Parquet File (which is written with the Record Writer).
Created 04-27-2023 01:14 AM
Add a UpdateAttribute in front of PutHDFS and use NEL to rename your file from ${filename} to ${filename}.parquet and then save it into HDFS wherever you want.
Created 04-27-2023 01:12 AM
this method does not work, puthdfs creates a directory with the extension directory1.parquet and inside the directory there is still a file without extension, I tried like this /user/test/${now():toNumber()}.parquet, but I need to have files in the same test directory and not parquet directories
Created 04-27-2023 01:14 AM
Add a UpdateAttribute in front of PutHDFS and use NEL to rename your file from ${filename} to ${filename}.parquet and then save it into HDFS wherever you want.
Created 04-27-2023 10:20 PM
yes, it helped me, it's a pity that there is no built-in functionality for puthdfs
Created 04-28-2023 03:17 AM
is it possible to make three processes per kafka and so that the output data is not repeated?
Created on 04-26-2023 08:19 AM - edited 04-26-2023 08:30 AM
puthdfs process can file give transform parquet to HDFS or do i need putparquet to HDFS?
Created 04-28-2023 05:05 AM
another interesting point is how to implement on updateattribute the function of checking whether it was delivered to puthdfs because if hdfs runs out locally, the process continues to go and the files are not written but are thrown out of the queue and go to another file in the basket, in fact, files are lost if the meso runs out and the chain does not stop , you need to check if the file in ndfs did not arrive stop the stream or the memory ran out in ndfs stop putndfs and let the recycle bin fill up