Support Questions

Find answers, ask questions, and share your expertise

NiFi GetKafka ZooKeeper Connection Error

avatar
Explorer

Hi,

I'm trying to set up data flow by using a GetKafka processor to pull data from a Kafka cluster and a PutFile to save it on local file. However, I ran into connection refused error even though my server has access to the ZooKeeper connection by using telnet. (The log shows connected first and then refused...). I'm on NiFi 0.7 and Kafka 0.9, so it shouldn't be a version issue. Can anyone please help out?

org.I0Itec.zkclient.ZkClient zookeeper state changed (SyncConnected)
2016-07-26 21:15:09,690 INFO [pool-67-thread-1-SendThread(52.90.171.224:2181)] org.apache.zookeeper.ClientCnxn Opening socket connection to server 52.90.171.224/52.90.171.224:2181. Will not attempt to authenticate using SASL (unknown error)
2016-07-26 21:15:09,693 INFO [pool-67-thread-1-SendThread(52.90.171.***:2181)] org.apache.zookeeper.ClientCnxn Socket connection established to 52.90.171.224/52.90.171.224:2181, initiating session
2016-07-26 21:15:09,698 INFO [pool-67-thread-1-SendThread(52.90.171.224:2181)] org.apache.zookeeper.ClientCnxn Session establishment complete on server 52.90.171.224/52.90.171.224:2181, sessionid = 0x2e560467decc2fed, negotiated timeout = 6000
2016-07-26 21:15:09,699 INFO [pool-67-thread-1-EventThread] org.I0Itec.zkclient.ZkClient zookeeper state changed (SyncConnected)
2016-07-26 21:15:09,707 INFO [Framework Task Thread Thread-4-SendThread(localhost:2181)] org.apache.zookeeper.ClientCnxn Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
2016-07-26 21:15:09,708 WARN [Framework Task Thread Thread-4-SendThread(localhost:2181)] org.apache.zookeeper.ClientCnxn Session 0x0 for server null, unexpected error, closing socket connection and attempting reconnect
java.net.ConnectException: Connection refused
        at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[na:1.7.0_80]
        at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:744) ~[na:1.7.0_80]
        at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361) ~[na:na]
        at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1081) ~[na:na]

Thanks!

Stephanie

1 ACCEPTED SOLUTION

avatar
Master Guru

I just realized something... GetKafka and PutKafka use the kafka-client 0.8.2, and there are newer processors in 0.7.0 called ConsumeKafka and PublishKafka which use kafka-client 0.9.0.1.

Since you are using Kafka 0.9 I think we should be using ConsumeKafka here. Lets see if that works any better, and sorry for the confusion.

View solution in original post

11 REPLIES 11

avatar
Explorer

Hi Bryan,

I've successfully get data from Kafka and save it down to HDFS in a one master one slave cluster. When I added one more slave node restart my data flow, however, only the first slave works, the newly added slave is live. When I increase the concurrent task in ConsumeKafka from 1 to 5, which is actually how many partition we have in the Kafka cluster, I get error saying 'kafkaconsumer is not safe for multi-threaded access'. Do I need set up a ConsumeKafka processor group in this circumstance to allow this data flow running in parallel across the whole NiFi cluster?

Thanks for the help,

Stephanie

avatar
Master Guru

Hi Stephanie, I'm actually not sure about that one, but I think it has more to do with the Kafka client and the number of partitions. You should be able to have a ConsumeKafka processor running on each node of your NiFi cluster and each pulling data without doing anything special. It might be good to start a new question about this specific problem with ConsumeKafka only consuming data on one node.