Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

NiFi Consume Kafka 010 not ingesting messages but Producer and Consumer work

avatar
Explorer

Hi,

I feel like this question has been asked before in similar ways but each response has yet to yield a result that has yet to work for me. So, I have an Amazon Linux machine with Apache Kafka 0.10.0 on it and when I start zookeeper and kafka I am able to successfully produce and consume messages on that machine.

On a separate Amazon Windows server I have Apache Nifi 1.6 installed and I am using the Consume Kafka 0_10 processor to try and receive messages from the topic that was created on my standalone kafka install (Linux machine). I have tried to use the ip address of the Linux machine as the broker within the NiFi processor with port 9092 as well as the Ec2 instance name ec2-XX-X-XXX-XXX.XX-XXXX-1.compute.amazonaws.com:9092 but neither show any data coming in when I manually type in messages from the Kafka producer on the Linux machine. There are no errors produced in the logs of NiFi and I I know that port 9092 is open so I would think the messages would come through.

In the server.properties file for Kafka I have the listeners set to listeners=PLAINTEXT://:9092 (bases on some stack overflow post recommendation). In that same file I also have advertised.listeners=PLAINTEXT://XX-X-XXX-XXX.XX-XXXX-1.compute.amazonaws.com:9092.

I have also tried NiFi 1.5 and to see if this was a version issue but nothing seems to work. I would think that if Nifi is not producing errors, the proper ports are open and I can produce and consume locally to Linux machine then I would think there should be no reason the Consume Kafka 0_10 couldn't pick up the messages I type and parse them into a flow file.

Sorry for the long question but if anyone has some insight on this issue please let me know.

Thanks, Jason

13 REPLIES 13

avatar
@Jason Sphar

What version of windows is being used?

avatar
Explorer

@Wynner - Nifi 1.6 is running on Windows Server 2016 hosted on Amazon Ec2

avatar

Hey @Jason Sphar what about the port from zookeeper? (2181 by default).
And could you check if there's any extra consumer group (probably used by Nifi Processor).
Hope this helps!
Ah and kafka logs are showing something?

avatar
Explorer

@Vinicius Higa Murakami -- Regarding your questions, I have posted my responses below:

What about the port from zookeeper? (2181 by default). - Do you mean by using port 2181 instead of port 9092 in the Consume Kafka Processor? If so, I have open port 2181 and have changed the Kafka broker to ec2-XX-X-XXX-XXX.XX-XXXX-1.compute.amazonaws.com:2181 but still nothing came over.

And could you check if there's any extra consumer group (probably used by Nifi Processor) - By extra consumer group do you mean another processor that is also trying to consume the same topic on that same port? If so, I can confirm that I have only one processor trying to access Kafka in this NiFi model. I had the same set up in NiFi 1.5 but that processor has been stopped and that instance of NiFi is not running.

Kafka logs are showing something? - I have checked the Controller logs and the server logs and I only see the following:

From the Controller Logs 2018-06-06 17:55:15,642] TRACE [Controller 0]: checking need to trigger partition rebalance (kafka.controller.KafkaController) [2018-06-06 17:55:15,642] DEBUG [Controller 0]: preferred replicas by broker Map(0 -> Map([test,0] -> List(0))) (kafka.controller.KafkaController) [2018-06-06 17:55:15,642] DEBUG [Controller 0]: topics not in preferred replica Map() (kafka.controller.KafkaController) [2018-06-06 17:55:15,643] TRACE [Controller 0]: leader imbalance ratio for broker 0 is 0.000000 (kafka.controller.KafkaController)

I do not see any errors or warnings.


Kafka is running on a Free Tier Linux within in Amazon Ec2 so I know there are restrictions of size and memory usages but since I was able to configure Kafka to use less memory it works fine within that local environment. I don't think that is the issue here but just thought to mention it.


Jason

avatar

@Jason Sphar
And could you check if there's any extra consumer group (probably used by Nifi Processor)
Sorry about the mess, actually I mean by trying to see in command line if there's any consumer group for nifi processor.
You can try this one, to see if Nifi is able to reach kafka and create a consumer group.

/usr/<yourKafkapath>/kafka/bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:9092 --list

And if you found smtg, can try to describe it:

/usr/<yourKafkapath>/kafka/bin/kafka-consumer-groups.sh --bootstrap-server kafka-hotst:9092 --describe --group consumer_anyname_2390934

Kafka is running on a Free Tier Linux within in Amazon Ec2 so I know there are restrictions of size and memory usages but since I was able to configure Kafka to use less memory it works fine within that local environment. I don't think that is the issue here but just thought to mention it.

Yeah agree on that, It shouldn't be a problem, since you can work with kafka manually.

Hope this helps 🙂

avatar
Explorer

@Vinicius Higa Murakami -- So I tried the first command and it returned console-consumer-96628. I then tried to use that name in the second command like this but think I got something wrong because it doesn't accept it: bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group console-consumer-96628.

I was also talking to a co-worker who uses Kafka more and we were wondering if I have to create a consumer group and then assign my topic to that group. Then on the Nifi end I would have to supply that proper Group Id in the processor? So far I usually just put some value like 0 in for the Group Id on the processor and hoped it would connect directly without actually needing the group.

Thanks for your help!

Jason


avatar

Hey @Jason Sphar !
Hmm that's strange, did you try to run with zookeeper parameter?

/bin/kafka-consumer-groups.sh --zookeeper ZKHOSTS:2181 --describe --group console-consumer-96628

Hope this helps! 🙂

avatar
Explorer

@Vinicius Higa Murakami -- I just tried the command again with the zookeeper parameter and got the following:

GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG OWNER

Could not fetch offset from kafka for group console-consumer-96628 partition [test,0] due to org.apache.kafka.common.errors.NotCoordinatorForGroupException: This is not the correct coordinator for this group..

console-consumer-96628 test 0 unknown 56 unknown none

Any thoughts you have on this output would be greatly appreciated. Thanks again for your help! Jason

avatar

Hey @Jason Sphar.

Hmm, at least it give us a clue.
AFAIK the old versions of kafka used to accept zookeeper as a way to access the kafka cluster. And now you have this boostrap way (by passing any broker to reach the leader of partition + zk).
For example, i did a manual test in kafka, by creating a kafka topic, feeding with kafka-console-producer and getting the kafka-consumer-group, with the bootstrap-server parameter. I'm kinda confused now, cause in my humble opinion its seems that Nifi Processor has created the consumer-group on kafka, but kafka is unable to use..

[root@node1 ~]# kafka-topics.sh --zookeeper $ZKENSEMBLE --create --topic vin-hcc-nifi --partitions 3 --replication-factor 3
Created topic "vin-hcc-nifi".
[root@node1 ~]# kafka-console-producer.sh --broker-list node1:6667 --topic vin-hcc-nifi
>Im brazilian and im testing this topic 
>please work!
>HCC
[root@node2 ~]# kafka-console-consumer.sh --bootstrap-server node2:6667 --topic vin-hcc-nifi --from-beginning
Im brazilian and im testing this topic
please work!
HCC[root@node2 ~]# kafka-consumer-groups.sh --bootstrap-server node2:6667 --list 
Note: This will not show information about old Zookeeper-based consumers.
console-consumer-14185
console-consumer-81664
[root@node2 ~]# kafka-consumer-groups.sh --bootstrap-server node2:6667 --describe --group console-consumer-81664
Note: This will not show information about old Zookeeper-based consumers.
Consumer group 'console-consumer-81664' has no active members.
TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
vin-hcc-nifi                   0          1               1               0          -                                                 -                              -
vin-hcc-nifi                   2          1               1               0          -                                                 -                              -
vin-hcc-nifi                   1          1               1               0          -                                                 -                              -

Well, anyway I saw some interesting points about the consumer -group.
========================================================

Checking consumer position

Sometimes it's useful to see the position of your consumers. We have a tool that will show the position of all consumers in a consumer group as well as how far behind the end of the log they are. To run this tool on a consumer group named my-group consuming a topic named my-topic would look like this:

1 2 3 4 5 6 7 8> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-groupNote: This will only show information about consumers that use the Java consumer API (non-ZooKeeper-based consumers).TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-IDmy-topic 0 2 4 2 consumer-1-029af89c-873c-4751-a720-cefd41a669d6 /127.0.0.1 consumer-1my-topic 1 2 3 1 consumer-1-029af89c-873c-4751-a720-cefd41a669d6 /127.0.0.1 consumer-1my-topic 2 2 3 1 consumer-2-42c1abd4-e3b2-425d-a8bb-e1ea49b29bb2 /127.0.0.1 consumer-2

This tool also works with ZooKeeper-based consumers:

1 2 3 4 5 6 7 8> bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --describe --group my-groupNote: This will only show information about consumers that use ZooKeeper (not those using the Java consumer API).TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-IDmy-topic 0 2 4 2 my-group_consumer-1my-topic 1 2 3 1 my-group_consumer-1 my-topic 2 2 3 1 my-group_consumer-2

========================================================

Link: https://kafka.apache.org/10/documentation.html

So after that, I made a research and fount this KB Article made by @mrodriguez (link below), we can try this to solve your problem 🙂

https://community.hortonworks.com/content/supportkb/175137/error-the-group-coordinator-is-not-availa...

Hope this helps! And a special thanks for @mrodriguez
Cheers