Created on 03-09-2022 03:48 AM - last edited on 03-09-2022 01:19 PM by cjervis
Hi,
We are trying to fetch the data from one of our kafka cluster (200000 messages per seconds) where we have 19 partition on the topic and using nifi (cluster of 4 hosts) inbuilt processor (ConsumeKafka_2_6) however we are observing delay in processing messages in the processor (linage time of 7 sec per message). We tried using increase concurrent task to match up with the partition as well but it did not work. however if we use demarcation strategy (to club multiple messages into single flowfile) it is working fine but then we need individual messages for data transformation.
The other processor which does the JSON evaluation also have similar problem as it can not handle large incoming data from the cosumekafka processor.
Notes : Backpressure threshold is 100000 file and size threshold is 1 GB
PFB reference SS of the processor configured.
Evaluate Json path config: (Run duration 0.05 ms, concurrent task-4 per node )
Created 03-14-2022 11:21 PM
This is now sorted. Batching using record based processor helped us to improve performance in multifold. didn't know that that the excess of flowfile might result in slowness in NIFI.
Created 03-09-2022 04:56 AM
The ConsumerKafka processor will consume every single message from Kafka as a separate flowfile, which is very innefficient and makes NiF very slow to process all the data that you have. You should rewrite your flow using record-based processors, which will give you a lot more performance and throughput.
Please watch Mark Payne's "Apache NiFi Anti-Patterns, Part 1" where he explains the concept of record-based processing and talks about what not to do in NiFi.
Cheers,
André
--
Was your question answered? Please take some time to click on "Accept as Solution" below this post.
If you find a reply useful, say thanks by clicking on the thumbs up button.
Created 03-10-2022 12:51 AM
Thanks. But I am now stuck on how to fetch cascaded fields in QueryRecord processor. everytime I use any attribute in sql which is cascaded it throws error saying no column found in table.
Below is AVRO Schema. I am not able to access "cucsMemoryUnitInstanceId","site_identifier" but I am able to access "producedAt" "name" which are present at first level
{
"name": "MyClass",
"type": "record",
"namespace": "com.acme.avro",
"fields": [
{
"name": "labels",
"type": {
"name": "labels",
"type": "record",
"fields": [
{
"name": "__name__",
"type": "string"
},
{
"name": "cucsMemoryUnitInstanceId",
"type": "string"
},
{
"name": "instance",
"type": "string"
},
{
"name": "job",
"type": "string"
},
{
"name": "monitor",
"type": "string"
},
{
"name": "site_identifier",
"type": "string"
}
]
}
},
{
"name": "name",
"type": "string"
},
{
"name": "timestamp",
"type": "string"
},
{
"name": "value",
"type": "string"
},
{
"name": "producedAt",
"type": "string"
}
]
}
Created 03-14-2022 11:21 PM
This is now sorted. Batching using record based processor helped us to improve performance in multifold. didn't know that that the excess of flowfile might result in slowness in NIFI.
Created 03-09-2022 11:48 AM
@Onkar_Gagre
Is the $.name field unique for every record or do batches of records share the same $.name value?
If they are not unique, did you consider using the ConsumeKafkaRecord processor feeding a PartitionRecord processor to split your records by common name values?
This would still allow you to work with batches of records rather than and individual record per FlowFile.
Also might be helpful if you shared the details of your end-to-end use case as that may give folks the ability to offer even more dataflow design options.
Thanks,
Matt
Created 03-10-2022 09:05 PM
Thanks Matt.
The name field is different for each record json object. I will try using partition record with JSONTreeReader and see if I can segregate the flow based on name field and get it processed in batch.