Support Questions
Find answers, ask questions, and share your expertise

Why is Kafka not getting messages from TransformCsvToAvro Processor?

Expert Contributor

Attached the end of the NiFi flow for the schema registry. The bottom two processors are not getting any messages.

14391-screen-shot-2017-04-03-at-44550-pm.png

Here's the connection details to Kafka from the bottom left processor (intentionally masked the IP address).

ip-172-31-xx-xx.us-west-2.compute.internal:2181,ip-172-31-xxx-x.us-west-2.compute.internal:2181,ip-172-31-x-xx.us-west-2.compute.internal:2181

Here's the processor details for the bottom left processor:

14392-screen-shot-2017-04-04-at-95614-am.png

Here's the processor details from the bottom right:

14393-screen-shot-2017-04-04-at-100223-am.png

When running a kafka console consumer to check if there are messages, there are none.

./kafka-console-consumer.sh --zookeeper ip-172-31-xxx-xx.us-west-2.compute.internal:2181 --topic truck_speed_events_avro

./kafka-console-consumer.sh --zookeeper ip-172-31-xxx-xx.us-west-2.compute.internal:2181 --topic truck_events_avro

NiFi app logs doesn't show any exception. There are no errors as well on the Kafka side.

Here's the topic that was created with no errors.

[2017-04-04 10:24:50,718] INFO Created log for partition [truck_events_avro,0] in /kafka-logs with properties {compression.type -> producer, message.format.version -> 0.10.1-IV2, file.delete.delay.ms -> 60000, max.message.bytes -> 1000000, min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime, min.insync.replicas -> 1, segment.jitter.ms -> 0, preallocate -> false, min.cleanable.dirty.ratio -> 0.5, index.interval.bytes -> 4096, unclean.leader.election.enable -> true, retention.bytes -> -1, delete.retention.ms -> 86400000, cleanup.policy -> [delete], flush.ms -> 9223372036854775807, segment.ms -> 604800000, segment.bytes -> 1073741824, retention.ms -> 604800000, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760, flush.messages -> 9223372036854775807}. (kafka.log.LogManager)
[2017-04-04 10:25:16,038] INFO Created log for partition [truck_speed_events_avro,0] in /kafka-logs with properties {compression.type -> producer, message.format.version -> 0.10.1-IV2, file.delete.delay.ms -> 60000, max.message.bytes -> 1000000, min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime, min.insync.replicas -> 1, segment.jitter.ms -> 0, preallocate -> false, min.cleanable.dirty.ratio -> 0.5, index.interval.bytes -> 4096, unclean.leader.election.enable -> true, retention.bytes -> -1, delete.retention.ms -> 86400000, cleanup.policy -> [delete], flush.ms -> 9223372036854775807, segment.ms -> 604800000, segment.bytes -> 1073741824, retention.ms -> 604800000, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760, flush.messages -> 9223372036854775807}. (kafka.log.LogManager)
[2017-04-04 10:25:16,040] INFO Partition [truck_speed_events_avro,0] on broker 1003: No checkpointed highwatermark is found for partition [truck_speed_events_avro,0] (kafka.cluster.Partition)
2 REPLIES 2

@rgarcia

Kafka brokers normally listen on port 6667, that is the port that should be configured in the Kafka Brokers property in the PublishKafka_0_10 processors.

Master Guru

@rgarcia

it looks as though you have pointed your publishKafka processor at your Kafka's zookeeper nodes rather then your Kafka's brokers. The older putKafka processors used ZK, but the newer publishKafka processors use the brokers instead.

; ;