I'm exploring integration possibility of nifi processors with kafka. I use PutKafka to puch avro generated flow from ExecuteSQL. I verify that the topic created in Kafka is alimented with data using kafka-console-consumer bash command. Then I try to use getKafka processor to pull all the Data stored in the kafka topic and store all that data in a local File using the PutFile processor. I realize that the getKafka processor don't pull the topic content.
I verify if all is ok with kafka-avro-console-consumer (confluent command), I get this exception : org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1 Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
I tested also getKafka from a basic topic format and it doesn't work for me.
So I have some questions : How can I deal with avro format using getKafka anf putKafka processor ? Any idea to resolve getKafka processor problem to pull content from a kafka Topic ?
PS : I'm testing with the last version of Kafka (from hdp 2.4) and the last version of Nifi (HDF 1.2)
Thank you in advance
Hi, trying to isolate the problem. Is it only happening when Avro is in use? Can you test plain text first in your environment? The more details you provide about the flow and data, the easier it would be to assist.
With plain text. All is working without problems.
The problem come from avro format.
Is it a gotcha to support avro or Json format ?
I believe it is not an issue related to avro library. NiFi community is currently working at solving issues on Kafka processors. I'd suggest to wait for NiFi v0.6.1.
I have found, that NiFi and Kafka use different things under the "Avro" term. In Kafka, Avro data is:
- magic byte (0x00)
- 4 byte number indicating Schema Registry ID
- avro record (encoded data)In NiFi
ConvertJSONToAvro produces Avro Data Files, which is:
- Avro magic bytes header (3 bytes)
- possibly many avro records (encoded data).
This is causing Kafka deserializer to throw this exception. Possible workaround is to use JSON as the NiFi - Kafka communication format.
I am not sure if it will make a difference, but looking through the release notes for HDP 2.4, it is using Kafka version 0.9.
You should be using PublishKafka and ConsumeKafka processors when working with that version of Kafka.
@Michal Klempa The problem here is that when you say "Kafka's Avro data" you are really talking about the Avro that Confluent expects to work with their schema registry, which technically is a modified Avro format (to include the schema registry id) that isn't readable by standard Avro APIs unless you first remove that registry id and then read it as a bare record.
So keep in mind the following...
- Apache Kafka allows any bytes you want for a message and does not dictate anything related to Avro
- Apache NiFi can write any bytes you want to Apache Kafka
- Apache NiFi also has Avro related processors which produced standard Avro format
- Apache Kafka + Confluent's schema registry expects a modified Avro format with a schema registry id in it
So the behavior in this ticket is expected behavior to me... The person wrote regular Avro data from Apache NiFi to Apache Kafka, then tried to use's Confluent's console consumer which is not expecting regular Avro data.
Hi @Bryan Bende,
Thank you for your analysis.
Any solution for to use Confluent Avro consumer to consume the avro messages published from NiFi?
Can any NiFi kafka-processors publish avro, which are compatible with Confluent's requirement?