I am trying to push the data to Kafka topic which is running in my local system by using the PublishKafka_0_10 processors in apache Nifi.
The PublishKafka_0_10 processor is not able to push the data to kafka topic but i can able to create a topic, push the data to kafka topic and consume the data in console
below is the commands which is used :
Create Topic : bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
Producer : bin/kafka-console-producer.sh --broker-list localhost:9093,localhost:9094,localhost:9095 --topic test
Consumer : bin/kafka-console-consumer.sh --zookeeper localhost:2181,localhost:2182,localhost:2183 --topic test --from-beginning
OS : Mac
NIFI : 1.5.0
Kafka : kafka_2.10-0.10.0.0
screenshots are attached for the reference
Please check the Kafka ACLs and make sure that the ANONYMOUS user has permissions to write data to this topic.
You can try executing below command:
bin/kafka-acls.sh --authorizer-properties zookeeper.connect= localhost:2181 --add --allow-principal User:ANONYMOUS --operation All --topic=test --group=*
Thanks for the Response .
I have executed the below command and attached the screenshot for the reference but still the publisher is not able to push the data to Kafka topic
Executed command: "bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:ANONYMOUS --operation All --topic=test --group=*"
Below is the error which i am getting in nifi logs
2019-08-21 15:34:32,004 WARN [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.NetworkClient Connection to node -1 could not be established. Broker may not be available.
2019-08-21 15:34:32,106 WARN [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.NetworkClient Connection to node -2 could not be established. Broker may not be available.
2019-08-21 15:34:32,107 WARN [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.NetworkClient Connection to node -3 could not be established. Broker may not be available.
Thanks, @kiranps11 .
Is Nifi running in the same host than Kafka? If yes, can you specify IP addresses for the processor instead of localhost and see how it goes?
If not, make sure that the ports are listening properly by using netstat command for example.
In nifi-app you should be able to see the producer configurations, make sure that the broker list match with the broker's hosts and security protocol (PLAINTEXT).
Hello @ManuelCalvo .
Thanks for the response .
The Nifi , kafka and zookeeper instances are running in localhost.
I have tried by specifying the IP address as well but no luck.
below is the producer configurations which i got it from the nifi app logs
acks = 0 batch.size = 16384 block.on.buffer.full = false bootstrap.servers = [127.0.0.1:9093, 127.0.0.1:9094, 127.0.0.1:9095] buffer.memory = 33554432 client.id = compression.type = none connections.max.idle.ms = 540000 interceptor.classes = null key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer linger.ms = 0 max.block.ms = 5000 max.in.flight.requests.per.connection = 5 max.request.size = 1048576 metadata.fetch.timeout.ms = 60000 metadata.max.age.ms = 300000 metric.reporters =  metrics.num.samples = 2 metrics.sample.window.ms = 30000 partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner receive.buffer.bytes = 32768 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retries = 0 retry.backoff.ms = 100 sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT send.buffer.bytes = 131072 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS timeout.ms = 30000 value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
how did you managed to disable : ssl.endpoint.identification.algorithm or make it as null.
I get it as https:
How do i set ssl.endpoint.identification.algorithm to null. In my case it shows it ashttps and i am unable to override i t.