Support Questions

Find answers, ask questions, and share your expertise

Trouble connecting to Kafka from NiFi using containers

avatar
New Contributor

I'm trying to setup NiFi and Kafka so I can better understand how they communicate and how best to design a solution around them. I setup a test by starting containers for Kafka (as well as ZooKeeper) and NiFi. I wanted to test creating a flow in NiFi that would connect to the Kafka container using a PublishKafka processor. I'm testing this using these containers from Docker Hub:

  • apache/nifi
  • ubuntu/kafka

I ran ZooKeeper then Kafka then ran nifi and logged into NiFi and started to setup my flow. For the PublishKafka processor, I simply entered: localhost:9092 and after successfully creating the topic "INFO" in Kafka, I entered INFO as the topic in the processor. 

 

My flow starts with a tailFile, splitText, RouteonContent the works fine but when it gets to the PublishKafka, I get these errors:

 

PublishKafka_2_0[id=853da33d-0184-1000-f5ef-74ba8ae4f4a2]
Processing halted:yeilding [1 sec]:
org.apache.kafka.common.errors.TimeoutException:Timeout
expired while initializing transactional state in 5000ms.

 

I've tried several things unsuccessfully. Example, since the Kafka config in the container doesn't set listeners and advertised listeners, I tried configuring that following different articles, but nothing worked. The last thing I tried was to try these settings in my server.properties:

 

# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
listeners=INTERNAL://0.0.0.0:9092,OUTSIDE://0.0.0.0:9094

# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
advertised.listeners=INTERNAL://kafka:9092,OUTSIDE://localhost:9094
inter.broker.listener.name=INTERNAL

# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
listener.security.protocol.map=INTERNAL:PLAINTEXT,OUTSIDE:PLAINTEXT

 

After restarting, I tried changing my Kafka Brokers to localhost:9094 which still didn't work.

 

I could use some help as I don't feel I'm getting closer hunting for an answer. There's so much to learn beyond this initial step! Any help is appreciated as I'm new to this. Thank you!

0 REPLIES 0