Support Questions

Find answers, ask questions, and share your expertise

Read data from Kafka with NiFi

avatar
New Contributor

I read data from Kafka with ConsumerKafka processor and send them to MergeContent processor because I want to merge each hundred thousand data in a file. Then send them to UpdateAttribute processor cause I want to change filename, after that send them to PutSFTP processor cause I want to send data to an external server. My problem is that the number of data coming from Kafka is 100 thousand per second. But after going through this route, the output sends about 28 thousand data per second to the server and the lag value in the Kafka consumer increases over time. My Vm has 14 cores CPU and 21g RAM. The number of topic partitions that read from Kafka is 30, so I set thread to 30 in Consumer Kafka processor. Do you have any suggestions and ideas how I can increase the speed to reduce and eliminate the lag?

Screenshot (69).png

 

4 REPLIES 4

avatar
Community Manager

@zizo, Welcome to our community! To help you get the best possible answer, I have tagged in our Kafka and NiFi experts @MattWho @araujo @Babasaheb who may be able to assist you further.

Please feel free to provide any additional information or details about your query, and we hope that you will find a satisfactory solution to your question.



Regards,

Vidya Sargur,
Community Manager


Was your question answered? Make sure to mark the answer as the accepted solution.
If you find a reply useful, say thanks by clicking on the thumbs up button.
Learn more about the Cloudera Community:

avatar
Master Mentor

@zizo 

The ConsumeKafka processor acts as a Kafka consumer group.  So makes sense that you set 30 concurrent tasks (1 per partition) assuming this is a single instance of NiFi.  If you had a 3 node NiFi cluster, you would set the concurrent tasks to 10 (10 x 3 nodes = 30 consumers).  What version of Kafka Server are you consuming from?

MergeContent merging 100,000 FlowFiles (min). You did not share any component configurations here. 
Is the MergeContent configured correctly to make sure each merge generated is 100,000 FlowFiles?
Generally speaking, I would recommend using two MergeContent processors in series to reduce NiFi heap memory usage.  The more FlowFiles allocated to MergeContent bins, the more NiFi heap usage.  So first MergeContent merging say 20,000 min, followed by second MergeContent merging 5 min would achieve the same but with lower heap usage.

Update Attribute does not touch the content of a FlowFile.  It simply updated metadata/attributes about a FlowFile.  

PutSFTP throughput is for the most part dependent on the target SFTPP server and the network between NiFi and that SFP server.  Most SFTP servers only allow max 10 concurrent collections from the same client.  Did you configure this processor with 10 concurrent tasks?    Having a NiFi cluster would allow multiple NiFi nodes to send data concurrent to the SFTP server (10 concurrent tasks x 3).

Are you saying the FlowFiles start queuing up at the putSFTP processor eventually leading to backpressure being applies all the way back through your dataflow until you reach the ConsumeKafka processor?

Have you looked at CPU, disk I/O, network bandwidth and speed, NiFi heap usage?

If you found that the provided solution(s) assisted you with your query, please take a moment to login and click Accept as Solution below each response that helped.

Thank you,

Matt

avatar
New Contributor

Thanks for your response.

I don't have enough resource to make a nifi cluster with 3 nodes. I just use 1 node and flowfiles are 100,000 per second not min. This is more details.Screenshot (70).png

 

Screenshot (71).png

 

Screenshot (72).png

 

Screenshot (73).jpg

 

avatar
Super Collaborator

Consume Kafka has an attribute called "Message Demarcator"...click on it hold shift+enter and instead of pulling 1 event at a time it'll create a single FlowFile with several events at a time and might make your merge even better.  You can do the same thing with the Publish...merge on shift+enter and configure the same demarcator and you'll achieve greater throughput

joseomjr_0-1686778741033.png