Support Questions

Find answers, ask questions, and share your expertise

using nifi as a kafka streaming- real-time stream processing

avatar
Contributor

Hi , 

This is my current NIFI flow where i am consuming data in NIFI using ConsumeKafka_1_0 and publishing using PublishKafka_1_0 . Now 

HiveMQ (MQTT) → Kafka Topic A → NiFi consumes → NiFi publishes → Kafka Topic B

My requirement is send data in real time , i want to do real time data streams like kafka streams .

How to achieve that using NIFI . Is kafka streams available in NIFI ?

iam not doing and data transformation or any other operation to data. Iam just simply consuming and publishing data , but now i want to do it in real time .

2 ACCEPTED SOLUTIONS

avatar
Master Mentor

@Rohit1997jio 

For real-time streaming, Apache NiFi may not going to be the best option.  NiFi works with files at rest.  So you have an ingest processor that consumes from Kafka (consumed content is written to NiFi's content_repository and associated to a NiFi FlowFile that get moved to the outbound success relationship.  ConsumeKafkaRecord processor likely to give better throughput).  So even if you connect that success directly to a PublishKafka/PublishKafkaRecord processor, you'll still have a delay of some minimal amount of time as the Publish processor will get scheduled to execute and publish a FlowFiles message or record with multiple messages to the target cluster.

Please help our community grow. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped.

Thank you,
Matt

View solution in original post

avatar
Master Mentor

@Rohit1997jio 

Yes it can be fast, but that depends on your dataflow(s). How many other dataflows is your NiFi also running?  All these dataflows share the same JVM resources and utilize server resources.   Also concurrency is important so having a match between number of partitions in your source Topic A and the concurrency within your ConsumeKafkaRecord processor is important.  Concurrent tasks in the ConsumeKafkaRecord you be set using the following formula:

(num partitions in Topic A) / (num NiFi nodes in cluster) = (num concurrent tasks set on ConsumeKafkaRecord)


Example:
Kafka Topic partions = 12 partitions
NiFI cluster num nodes = 3
Concurrent tasks set on ConsumeKafkaRecord = 4

4 X 3 = 12 consumers in the consumer group (1  consumer per partition in source topic)

Also in NiFI keep in mind that each concurrent task utilizes a thread from NiFi's "Max Timer Driven Thread Count" thread pool.

MattWho_0-1760966034129.png

The default pool size is 10.   This means that only 10 concurrent task can execute at the same time.  Generally speaking threads milliseconds, but for optimal performance you'll want to mange your server CPU load average resources and set the pool higher to maximize your throughput performance.  

NiFi can show you the Core Load Average (1 = 100% utilization of one physical core).   

MattWho_1-1760966415499.png

So if your NiFi server has 4 cores and the load average is 4 or higher, the CPU is saturated.  You'll be able to use this Core Load Average data to determine how much your can increase the size of the "Max timer Driven Thread Count" pool setting.   NiFi recommends setting your "Max Timer Driven Count" initially to 2 - 4 times the number of Physical cores on the NiFi node.  This assume all nodes in the NiFi cluster have same number of physical cores.   If nodes have various numbers of physical cores, use the nodes with the fewest to set your initial pool size.  Then monitor core load average across all NiFi nodes and adjust accordingly.  There is no way to configure different thread pools per node in a NiFi cluster, NiFi expects all all servers in a NiFi cluster to have same hardware configuration.

 

Please help our community grow. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped.

Thank you,
Matt

 

View solution in original post

4 REPLIES 4

avatar
Master Mentor

@Rohit1997jio 

For real-time streaming, Apache NiFi may not going to be the best option.  NiFi works with files at rest.  So you have an ingest processor that consumes from Kafka (consumed content is written to NiFi's content_repository and associated to a NiFi FlowFile that get moved to the outbound success relationship.  ConsumeKafkaRecord processor likely to give better throughput).  So even if you connect that success directly to a PublishKafka/PublishKafkaRecord processor, you'll still have a delay of some minimal amount of time as the Publish processor will get scheduled to execute and publish a FlowFiles message or record with multiple messages to the target cluster.

Please help our community grow. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped.

Thank you,
Matt

avatar
Contributor

Ok , so can i make a flow like ConsumeKafkaRecord  --> topic A --> PublishKafkaRecord  --> topic B , using both record processor for consuming and publishing data ? Will this be fast ?

avatar
Contributor

Ok , so can i make a flow like ConsumeKafkaRecord  --> topic A --> PublishKafkaRecord  --> topic B , using both record processor for consuming and publishing data ? Will this be fast ?

avatar
Master Mentor

@Rohit1997jio 

Yes it can be fast, but that depends on your dataflow(s). How many other dataflows is your NiFi also running?  All these dataflows share the same JVM resources and utilize server resources.   Also concurrency is important so having a match between number of partitions in your source Topic A and the concurrency within your ConsumeKafkaRecord processor is important.  Concurrent tasks in the ConsumeKafkaRecord you be set using the following formula:

(num partitions in Topic A) / (num NiFi nodes in cluster) = (num concurrent tasks set on ConsumeKafkaRecord)


Example:
Kafka Topic partions = 12 partitions
NiFI cluster num nodes = 3
Concurrent tasks set on ConsumeKafkaRecord = 4

4 X 3 = 12 consumers in the consumer group (1  consumer per partition in source topic)

Also in NiFI keep in mind that each concurrent task utilizes a thread from NiFi's "Max Timer Driven Thread Count" thread pool.

MattWho_0-1760966034129.png

The default pool size is 10.   This means that only 10 concurrent task can execute at the same time.  Generally speaking threads milliseconds, but for optimal performance you'll want to mange your server CPU load average resources and set the pool higher to maximize your throughput performance.  

NiFi can show you the Core Load Average (1 = 100% utilization of one physical core).   

MattWho_1-1760966415499.png

So if your NiFi server has 4 cores and the load average is 4 or higher, the CPU is saturated.  You'll be able to use this Core Load Average data to determine how much your can increase the size of the "Max timer Driven Thread Count" pool setting.   NiFi recommends setting your "Max Timer Driven Count" initially to 2 - 4 times the number of Physical cores on the NiFi node.  This assume all nodes in the NiFi cluster have same number of physical cores.   If nodes have various numbers of physical cores, use the nodes with the fewest to set your initial pool size.  Then monitor core load average across all NiFi nodes and adjust accordingly.  There is no way to configure different thread pools per node in a NiFi cluster, NiFi expects all all servers in a NiFi cluster to have same hardware configuration.

 

Please help our community grow. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped.

Thank you,
Matt