Support Questions

Find answers, ask questions, and share your expertise

Kafka-->Nifi--parquet--->HDFS

avatar
Contributor

My process looks like this
ConsumerKafka 2.6 --- JoltTransformJSON---- ConvertRecord ----MergeRecord---PutHDFS
my task is i get data from kafka in jsion format i need to write large files to ndfs in parquet format, but i get small files of 10kb each i would like to receive files of at least 100mb (changed settings in merge always sends small files) and i get flow files, but you need parquet what's wrong.
1- The file must be large
2- file must be parquet

3 ACCEPTED SOLUTIONS

avatar

hi @VLban,

How do your files look like before reaching MergeRecord and how do they look after they went through MergeRecord? Besides that, what settings did you use in your MergeRecord?


For your two requirements, everything depends on how you configure MergeRecord.

  • To generate parquet files, you set the Parquet Writer in your Record Writer property.
  • For Large Files, you must define the Minimum Bin size, the Minimum Number of Records and optional Max Bin Age. What it would also help is the Correlation Attribute Name.

 

View solution in original post

avatar

First of all, you need to identify how the data comes from your kafka processor.Once you identified how the data comes from your kafka brokers, you can define the Record Reader in your MergeRecord Processor --> based on your original description I assume JSON, so you need a JSONTreeReader. Knowing that you want your data to be delivered as parquet, within your Record Writer, you must define a ParquestRecordSetWriter Controller Service, which will transform your data (which is read with the Record Reader) into a Parquet File (which is written with the Record Writer).

View solution in original post

avatar

Add a UpdateAttribute in front of PutHDFS and use NEL to rename your file from ${filename} to ${filename}.parquet and then save it into HDFS wherever you want.

View solution in original post

15 REPLIES 15

avatar

hi @VLban,

How do your files look like before reaching MergeRecord and how do they look after they went through MergeRecord? Besides that, what settings did you use in your MergeRecord?


For your two requirements, everything depends on how you configure MergeRecord.

  • To generate parquet files, you set the Parquet Writer in your Record Writer property.
  • For Large Files, you must define the Minimum Bin size, the Minimum Number of Records and optional Max Bin Age. What it would also help is the Correlation Attribute Name.

 

avatar
Contributor

It’s not clear to me about the process, I managed to shorten it, but this is not a parquet Consumer_kafka--MergeContent---PutHDFS at the output in HDFS GZ, you need to somehow change the parquet in MergeContent to include such a FlowFile MergeContent from kafka into one large file, as I do in Parquet this happen?

avatar
Contributor

It’s not clear to me about the process, I managed to shorten it, but this is not a parquet Consumer_kafka--MergeContent---PutHDFS at the output in HDFS GZ, you need to somehow change the parquet in MergeContent to include such a FlowFile MergeContent from kafka into one large file, as I do in Parquet this happen?

FlowFile Properties
Key: 'entryDate'
Value: 'Tue Apr 25 09:14:38 MSK 2023'
Key: 'lineageStartDate'
Value: 'Tue Apr 25 09:14:38 MSK 2023'
Key: 'fileSize'
Value: '1305'
FlowFile Attribute Map Content
Key: 'filename'
Value: '597dd31f-294d3-f5301d4c446b'
Key: 'kafka.consumer.id'
Value: 'readtopicnifi'
Key: 'kafka.consumer.offsets.committed'
Value: 'true'
Key: 'kafka.leader.epoch'
Value: '2'
Key: 'kafka.max.offset'
Value: '11016028'
Key: 'kafka.offset'
Value: '110168'
Key: 'kafka.partition'
Value: '0'
Key: 'kafka.timestamp'
Value: '1681997605'
Key: 'kafka.topic'
Value: 'ng'
Key: 'path'
Value: './'
Key: 'uuid'
Value: '091096d655236-4b0f-a0bf-55у3d3e819'


MergeRECORd recordread JsonTree doesn't work as flowfile comes from kafka process, how can this be implemented for parquet?

avatar

First of all, you need to identify how the data comes from your kafka processor.Once you identified how the data comes from your kafka brokers, you can define the Record Reader in your MergeRecord Processor --> based on your original description I assume JSON, so you need a JSONTreeReader. Knowing that you want your data to be delivered as parquet, within your Record Writer, you must define a ParquestRecordSetWriter Controller Service, which will transform your data (which is read with the Record Reader) into a Parquet File (which is written with the Record Writer).

avatar
Contributor

I don’t have a json schema, I want it to generate a schema for the input data and send it to the parquet on my parameters, this does not work

avatar
Contributor

my settings from jsontreeRead:

Schema Access Strategy Infer Schema
Schema Inference Cache No value set
Starting Field Strategy Root Node
Date Format yyyy-MM-dd
Time Format HH:mm:ss
Timestamp Format yyyy-MM-dd'T'HH:mm:ss.SSSX
 
parquetrecordsetwriter
Schema Write StrategyDo Not Write Schema
Schema CacheNo value set
Schema Access StrategyInherit Record Schema
Cache Size1000
Compression TypeUNCOMPRESSED
Row Group SizeNo value set
Page SizeNo value set
Dictionary Page SizeNo value set
Max Padding SizeNo value set
Enable Dictionary EncodingNo value set
Enable ValidationNo value set
Writer VersionNo value set
Avro Write Old List Structurefalse
Avro Add List Element Recordsfalse
INT96 FieldsNo value set
 
I don’t have a json schema, I want it to generate a schema for the input data and send it to the parquet on my parameters, this does not work

avatar
Contributor
ConvertRecord[id=bc90b7d4-0187-1000-0000-00003eadf340] Failed to process FlowFile[filename=043b3d7b-5de0-4f7e-842f-2cbe5c972ace]; will route to failure: org.apache.parquet.schema.InvalidSchemaException: Cannot write a schema with an empty group: required group file {

 I'm trying differently now

Снимок экрана 2023-04-26 в 15.10.13.png

avroshemaregistry

{
  "type": "record",
  "name": "kafka_record",
  "fields": [
    {"name": "upstream_response_time", "type": "string"},
    {"name": "ssl_cipher", "type": "string"},
    {"name": "upstream", "type": "string"},
    {"name": "log", "type": {
      "type": "record",
      "name": "log",
      "fields": [
        {"name": "offset", "type": "long"},
        {"name": "file", "type": {
          "type": "record",
          "name": "file",
          "fields": []
        }}
      ]
    }},
    {"name": "method", "type": "string"},
    {"name": "clientip", "type": "string"},
    {"name": "user_agent", "type": "string"},
    {"name": "realip", "type": "string"},
    {"name": "http_accept_encoding", "type": "string"},
    {"name": "country", "type": "string"},
    {"name": "timestamp_delivery", "type": "string"},
    {"name": "http_accept_language", "type": "string"},
    {"name": "scheme", "type": "string"},
    {"name": "request_id", "type": "string"},
    {"name": "http_referer", "type": "string"},
    {"name": "req_lengths", "type": "string"},
    {"name": "server_protocol", "type": "string"},
    {"name": "request", "type": "string"},
    {"name": "request_time", "type": "string"},
    {"name": "ssl_protocol", "type": "string"},
    {"name": "host", "type": "string"},
    {"name": "cache", "type": "string"},
    {"name": "input", "type": {
      "type": "record",
      "name": "input",
      "fields": []
    }},
    {"name": "agent", "type": {
      "type": "record",
      "name": "agent",
      "fields": []
    }},
    {"name": "hostname_logstash", "type": "string"},
    {"name": "x_requested_with", "type": "string"},
    {"name": "status", "type": "string"},
    {"name": "project_id", "type": "string"},
    {"name": "cookie_session", "type": "string"},
    {"name": "timestamp", "type": "string"},
    {"name": "serverip", "type": "string"},
    {"name": "geo", "type": "string"},
    {"name": "source", "type": "string"},
    {"name": "upstream_status", "type": "string"},
    {"name": "upstream_port", "type": "string"},
    {"name": "hostname", "type": "string"},
    {"name": "size", "type": "string"},
    {"name": "ssl_ja3_hash", "type": "string"},
    {"name": "sni", "type": "string"},
    {"name": "http_accept", "type": "string"},
    {"name": "location_id", "type": "string"},
    {"name": "server_port", "type": "string"},
{"name": "timestamp_record", "type": "string"},
{"name": "param_request", "type": "string"}
]
}

Снимок экрана 2023-04-26 в 15.11.42.png

avatar
Contributor

I figured out this error, then I have a problem, I get files in HDFS, but they are without the parquet extension, I don’t understand what kind of files these are if I put them everywhere without compression

avatar

If you need the parquet extension you can use PutHDFS and define the path to your location and add the extension after your filename. Something like: /path/to/where/you/want/${filename}.parquet.
otherwise you can implement an updateattribute before puthdfs and rename your flowfile from ${filename} into ${filename}.parquet.
Or you can use PutParquet and that's is all.