Support Questions

Find answers, ask questions, and share your expertise

use only one consumer and producer processor in NIFI to pubish data to multiple kafka topics

avatar
Contributor

I want to create a flow In NIFI where i want to consumer data from 3 Kafka topics and produce that data into 3 different Kafka topics . each topic should produce data into unique topic . for example - 

kafka topics 1  -->  produce to topic A
kafka topics 2 --> produce to topic B
kafka topics 3 --> produce to topic C

i want to use only one Consumer processor and one producer processor . Right now i am using 3 produce Kafka processor .

Can anyone suggest better approach to do so and in more optimized way. direct kafka .png

1 ACCEPTED SOLUTION

avatar

In regards to your nifi expression language.:    Test in a simple flow, and inspect the flowfile's attributes to insure topicName is correct.    Then take it to the kafka processor.  If you are not seeing the results of the expression, something isnt right, or the value doesnt except Expression Language.  Look at the (?) on any property to see what is accepted.   I suspect the expression language is invalid as well, so make sure you test and confirm the attribute is as expected before trying to use it deeper in your flow.

 

In regards to mapping.   What i suggest could be DistributedMapCache or even a flow that does a lookup against some other service.  With this concept you provide a mapping key value pairs that correspond to your consumer and producer topics.  When you lookup a key, For Example:  "alerts_Consumer_Topic_Name" the value would be "alerts_Producer_Topic_Name".   If this is stored outside of nifi, then these values can be managed and changed outside of the scope of the nifi flow.

 

Example flow with DistributeMapCache:

 

https://github.com/ds-steven-matison/NiFi-Templates/blob/main/DistributedCache_Demo.xml

View solution in original post

5 REPLIES 5

avatar

@Rohit1997jio   I do not think this is possible.   You would need a method outside of the consume/produce that handles logic for which consume topic maps to which produce topic.   Then you could use a dynamic topic name in the producer. However, you would still be limited in fact that ConsumeKafka doesnt take upstream connections.  

 

In the example above,  if customerTopicX is attribute based, you can just use the same attribute logic in topic Name for  a single publishKafka versus three seen above.  That would atleast clean up your flow.

avatar
Contributor

i tried using expression language . kafka.topic is the attribute which contains consumer topics. so i added if else condition for different consumer topic but the output was not the producer topic but whole string mentioned below

in update attribute added a property customerKafkaTopic=${kafka.topic:contains('alerts'):ifElse('${kafkaTopic1}',${kafka.topic:contains('events'):ifElse('${kafkaTopic2}',${kafka.topic:contains('ack'):ifElse('${kafkatopic3}','Not_found')}))}}

and passed this property customerKafkaTopic in publish kafka but it is not working , 

avatar
Contributor

 You would need a method outside of the consume/produce that handles logic for which consume topic maps to which produce topic. 

can you please elaborate it more , what method and what logic need to be there

 

avatar
Contributor

 You would need a method outside of the consume/produce that handles logic for which consume topic maps to which produce topic. 

can you please elaborate it more , what method and what logic need to be there

avatar

In regards to your nifi expression language.:    Test in a simple flow, and inspect the flowfile's attributes to insure topicName is correct.    Then take it to the kafka processor.  If you are not seeing the results of the expression, something isnt right, or the value doesnt except Expression Language.  Look at the (?) on any property to see what is accepted.   I suspect the expression language is invalid as well, so make sure you test and confirm the attribute is as expected before trying to use it deeper in your flow.

 

In regards to mapping.   What i suggest could be DistributedMapCache or even a flow that does a lookup against some other service.  With this concept you provide a mapping key value pairs that correspond to your consumer and producer topics.  When you lookup a key, For Example:  "alerts_Consumer_Topic_Name" the value would be "alerts_Producer_Topic_Name".   If this is stored outside of nifi, then these values can be managed and changed outside of the scope of the nifi flow.

 

Example flow with DistributeMapCache:

 

https://github.com/ds-steven-matison/NiFi-Templates/blob/main/DistributedCache_Demo.xml