Created on 08-27-2017 04:22 AM - edited 08-17-2019 06:03 PM
Hi,
We are using below NiFi processors to generate a sample pipline data flow using kafka:
GetFile --> RouteOnAttribute --> publishKafka_0_10
Workflow steps:
-----------------------
- We used a csv file (50MB) to be read using 'GetFile' processor. Here, we have provided the folder location of the file.
Consider each record is of 1KB size and as such there are 50K records on the file - Which is working.
- Then we have connected to 'RouteOnAttribute' processor. To pick only this file from the GetFile processor - Which is working.
- Then we did connected to 'publishKafka_0_10' processor. Here, we did provided a topic and started the configured server with properties - Which is working too.
Following is the issue we are facing when we are trying publish data into Kafka topic using PublishKafka_0_10 1.2.0 processor of Nifi 1.2.0 in HDF 3.0.1.0-43 :
- I did use my Spark-Kafka consumer (which is my custom spark job, I am running in cluster). Here, I did provided the maximum batch fetch size: 30MB and buffer size as 15MB.
- But, while running the spark job, I am getting only 10 records as my batch size (as my spark consumer is consuming 10 records as a batch).
NOTE: I tried using custom kafka producer (sample kafka producer code) which can produce set of messages (1KB/record) by iterating from 1 - 50K count. Here, using the same spark-consumer, I did received maximum set of records as a batch (15K records/batch).
Seems the issue with publishKafka processor, which is able to send only few records to the topic.
Is there any way I can tune the parameters to achieve maximum throughput writes to a topic using this processor ??
Please find attached were configuration I did used for publishKafka and spark-kafka consumer (custom spark code):
Thanks in advance,
Sravanthi
Created 08-28-2017 07:19 PM
Did you set the 'Message Demarcator' property on PublishKafka_0_10?
It should be set to a new-line by pressing shift+enter while in the value field.
Also, you can increase the concurrent tasks from 1 to 2 for PublishKafka, but this would only help if you have multiple CSV files reaching the processor at the same time.
You can also look into how many partitions you have on the topic, you'll get better performance reading/writing with more than 1 partition.
Created 08-28-2017 07:19 PM
Did you set the 'Message Demarcator' property on PublishKafka_0_10?
It should be set to a new-line by pressing shift+enter while in the value field.
Also, you can increase the concurrent tasks from 1 to 2 for PublishKafka, but this would only help if you have multiple CSV files reaching the processor at the same time.
You can also look into how many partitions you have on the topic, you'll get better performance reading/writing with more than 1 partition.
Created 08-29-2017 05:23 AM
Thanks a lot Bryan. I did added the new-line using shift+enter on the value field and I am succeeded to get the desired result of batch of records. Earlier I did tried using '\n' and '\\n' for this attribute.
Out of curiosity, '\\n' should be equivalent to 'shift+enter'. Please correct me here.
Thanks,
Sravanthi
Created 08-29-2017 01:17 PM
Unfortunately it depends on the processor, some processors were implemented to support '\\n' and some were not. Usually the documentation for the given property will say how to enter the value.