Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Trouble connecting to Kafka from NiFi using containers

avatar
New Contributor

I'm trying to setup NiFi and Kafka so I can better understand how they communicate and how best to design a solution around them. I setup a test by starting containers for Kafka (as well as ZooKeeper) and NiFi. I wanted to test creating a flow in NiFi that would connect to the Kafka container using a PublishKafka processor. I'm testing this using these containers from Docker Hub:

  • apache/nifi
  • ubuntu/kafka

I ran ZooKeeper then Kafka then ran nifi and logged into NiFi and started to setup my flow. For the PublishKafka processor, I simply entered: localhost:9092 and after successfully creating the topic "INFO" in Kafka, I entered INFO as the topic in the processor. 

 

My flow starts with a tailFile, splitText, RouteonContent the works fine but when it gets to the PublishKafka, I get these errors:

 

PublishKafka_2_0[id=853da33d-0184-1000-f5ef-74ba8ae4f4a2]
Processing halted:yeilding [1 sec]:
org.apache.kafka.common.errors.TimeoutException:Timeout
expired while initializing transactional state in 5000ms.

 

I've tried several things unsuccessfully. Example, since the Kafka config in the container doesn't set listeners and advertised listeners, I tried configuring that following different articles, but nothing worked. The last thing I tried was to try these settings in my server.properties:

 

# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
listeners=INTERNAL://0.0.0.0:9092,OUTSIDE://0.0.0.0:9094

# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
advertised.listeners=INTERNAL://kafka:9092,OUTSIDE://localhost:9094
inter.broker.listener.name=INTERNAL

# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
listener.security.protocol.map=INTERNAL:PLAINTEXT,OUTSIDE:PLAINTEXT

 

After restarting, I tried changing my Kafka Brokers to localhost:9094 which still didn't work.

 

I could use some help as I don't feel I'm getting closer hunting for an answer. There's so much to learn beyond this initial step! Any help is appreciated as I'm new to this. Thank you!

0 REPLIES 0