Created 04-19-2023 03:52 AM
My process looks like this
ConsumerKafka 2.6 --- JoltTransformJSON---- ConvertRecord ----MergeRecord---PutHDFS
my task is i get data from kafka in jsion format i need to write large files to ndfs in parquet format, but i get small files of 10kb each i would like to receive files of at least 100mb (changed settings in merge always sends small files) and i get flow files, but you need parquet what's wrong.
1- The file must be large
2- file must be parquet
Created 04-19-2023 06:58 AM
hi @VLban,
How do your files look like before reaching MergeRecord and how do they look after they went through MergeRecord? Besides that, what settings did you use in your MergeRecord?
For your two requirements, everything depends on how you configure MergeRecord.
Created 04-26-2023 12:55 AM
First of all, you need to identify how the data comes from your kafka processor.Once you identified how the data comes from your kafka brokers, you can define the Record Reader in your MergeRecord Processor --> based on your original description I assume JSON, so you need a JSONTreeReader. Knowing that you want your data to be delivered as parquet, within your Record Writer, you must define a ParquestRecordSetWriter Controller Service, which will transform your data (which is read with the Record Reader) into a Parquet File (which is written with the Record Writer).
Created 04-27-2023 01:14 AM
Add a UpdateAttribute in front of PutHDFS and use NEL to rename your file from ${filename} to ${filename}.parquet and then save it into HDFS wherever you want.
Created 04-19-2023 06:58 AM
hi @VLban,
How do your files look like before reaching MergeRecord and how do they look after they went through MergeRecord? Besides that, what settings did you use in your MergeRecord?
For your two requirements, everything depends on how you configure MergeRecord.
Created 04-25-2023 11:24 PM
It’s not clear to me about the process, I managed to shorten it, but this is not a parquet Consumer_kafka--MergeContent---PutHDFS at the output in HDFS GZ, you need to somehow change the parquet in MergeContent to include such a FlowFile MergeContent from kafka into one large file, as I do in Parquet this happen?
Created 04-25-2023 11:23 PM
It’s not clear to me about the process, I managed to shorten it, but this is not a parquet Consumer_kafka--MergeContent---PutHDFS at the output in HDFS GZ, you need to somehow change the parquet in MergeContent to include such a FlowFile MergeContent from kafka into one large file, as I do in Parquet this happen?
MergeRECORd recordread JsonTree doesn't work as flowfile comes from kafka process, how can this be implemented for parquet?
Created 04-26-2023 12:55 AM
First of all, you need to identify how the data comes from your kafka processor.Once you identified how the data comes from your kafka brokers, you can define the Record Reader in your MergeRecord Processor --> based on your original description I assume JSON, so you need a JSONTreeReader. Knowing that you want your data to be delivered as parquet, within your Record Writer, you must define a ParquestRecordSetWriter Controller Service, which will transform your data (which is read with the Record Reader) into a Parquet File (which is written with the Record Writer).
Created 04-26-2023 03:47 AM
I don’t have a json schema, I want it to generate a schema for the input data and send it to the parquet on my parameters, this does not work
Created on 04-26-2023 02:25 AM - edited 04-26-2023 03:46 AM
my settings from jsontreeRead:
Created 04-26-2023 05:12 AM
ConvertRecord[id=bc90b7d4-0187-1000-0000-00003eadf340] Failed to process FlowFile[filename=043b3d7b-5de0-4f7e-842f-2cbe5c972ace]; will route to failure: org.apache.parquet.schema.InvalidSchemaException: Cannot write a schema with an empty group: required group file {
I'm trying differently now
avroshemaregistry
Created on 04-26-2023 08:00 AM - edited 04-26-2023 08:00 AM
I figured out this error, then I have a problem, I get files in HDFS, but they are without the parquet extension, I don’t understand what kind of files these are if I put them everywhere without compression
Created 04-26-2023 01:12 PM
If you need the parquet extension you can use PutHDFS and define the path to your location and add the extension after your filename. Something like: /path/to/where/you/want/${filename}.parquet.
otherwise you can implement an updateattribute before puthdfs and rename your flowfile from ${filename} into ${filename}.parquet.
Or you can use PutParquet and that's is all.