Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Delay in Processing huge number of flowfile using kafkaconsumer processor in NIFI

avatar
Contributor

Hi,

We are trying to fetch the data from one of our kafka cluster (200000 messages per seconds) where we have 19 partition on the topic and using nifi (cluster of 4 hosts) inbuilt processor (ConsumeKafka_2_6) however we are observing delay in processing messages in the processor (linage time of 7 sec per message). We tried using increase concurrent task to match up with the partition as well but it did not work. however if we use demarcation strategy (to club multiple messages into single flowfile) it is working fine but then we need individual messages for data transformation.  

The other processor which does the JSON evaluation also have similar problem as it can not handle large incoming data from the cosumekafka processor.

 

Notes : Backpressure threshold is 100000 file and size threshold is 1 GB

PFB reference SS of the processor configured.

 

Onkar_Gagre_0-1646825996592.png

Evaluate Json path config:  (Run duration 0.05 ms,  concurrent task-4 per node )

 

Onkar_Gagre_1-1646826304669.png

 

 

1 ACCEPTED SOLUTION

avatar
Contributor

This is now sorted. Batching using record based processor helped us to improve performance in multifold. didn't know that that the excess of flowfile might result in slowness in NIFI. 

 

View solution in original post

5 REPLIES 5

avatar
Super Guru

@Onkar_Gagre ,

 

The ConsumerKafka processor will consume every single message from Kafka as a separate flowfile, which is very innefficient and makes NiF very slow to process all the data that you have. You should rewrite your flow using record-based processors, which will give you a lot more performance and throughput.

 

Please watch Mark Payne's "Apache NiFi Anti-Patterns, Part 1" where he explains the concept of record-based processing and talks about what not to do in NiFi.

 

Cheers,

André

--
Was your question answered? Please take some time to click on "Accept as Solution" below this post.
If you find a reply useful, say thanks by clicking on the thumbs up button.

--
Was your question answered? Please take some time to click on "Accept as Solution" below this post.
If you find a reply useful, say thanks by clicking on the thumbs up button.

avatar
Contributor

Thanks. But I am now stuck on how to fetch cascaded fields in QueryRecord processor. everytime I use any attribute in sql which is cascaded it throws error saying no column found in table.

Below is AVRO Schema. I am not able to access "cucsMemoryUnitInstanceId","site_identifier" but I am able to access "producedAt" "name" which are present at first level

 

{
"name": "MyClass",
"type": "record",
"namespace": "com.acme.avro",
"fields": [
{
"name": "labels",
"type": {
"name": "labels",
"type": "record",
"fields": [
{
"name": "__name__",
"type": "string"
},
{
"name": "cucsMemoryUnitInstanceId",
"type": "string"
},
{
"name": "instance",
"type": "string"
},
{
"name": "job",
"type": "string"
},
{
"name": "monitor",
"type": "string"
},
{
"name": "site_identifier",
"type": "string"
}
]
}
},
{
"name": "name",
"type": "string"
},
{
"name": "timestamp",
"type": "string"
},
{
"name": "value",
"type": "string"
},
{
"name": "producedAt",
"type": "string"
}
]
}

 

avatar
Contributor

This is now sorted. Batching using record based processor helped us to improve performance in multifold. didn't know that that the excess of flowfile might result in slowness in NIFI. 

 

avatar
Super Mentor

@Onkar_Gagre 

Is the $.name field unique for every record or do batches of records share the same $.name value?

If they are not unique, did you consider using the ConsumeKafkaRecord processor feeding a PartitionRecord processor to split your records by common name values?

This would still allow you to work with batches of records rather than and individual record per FlowFile.

Also might be helpful if you shared the details of your end-to-end use case as that may give folks the ability to offer even more dataflow design options.

Thanks,

Matt

avatar
Contributor

Thanks Matt.

The name field is different for each record json object. I will try using partition record with JSONTreeReader and see if I can segregate the flow based on name field and get it processed in batch.