Support Questions

Find answers, ask questions, and share your expertise

Read binary avro data from Kafka using Nifi

avatar
Contributor

I want to be able to read binary Avro data from Kafka using ConsumeKafka Processor. And I am able to do that, but the content type of the file is in "application/octet-stream". Not able to view it as well, it says "No viewer is registered for this content type". I am not even able to convert this avro data to json, since the content type is octet-stream. But when i use "kafka-avro-console-consumer" on the console the data is in json. How to get this json data into Nifi?

1 ACCEPTED SOLUTION

avatar
Super Guru

@syntax_ ,

 

It seems to me that your source Kafka is a Confluent Kafka cluster and the producer uses schema registry to source the schema from it. In this case, the KafkaAvroSerializer prepends 5 bytes to every message produced to indicate the id of the schema that was used (in you case, schema id 34). If you try to read this message as a pure Avro payload the deserialization will fail because those 5 bytes are not part of the Avro payload.

 

So, the best way to handle this in NiFi is to also use Schema Registry to deserialize Avro messages. With this, NiFi will get the schema ID from the message 5-byte prefix, use that ID to retrieve the correct schema from Schema Registry and then correctly deserialize the Avro payload.

 

Considering that my guess is correct and you're using a Confluent Schema Registry, you should create a new ConfluentSchemaRegistry controller service and configure it with the details of your Schema Registry. Once this is done, edit the configuration of the Avro Reader controller service and set the following:

araujo_0-1662355081260.png

 

After you do this, your flow should be able to correctly process the messages that you're reading from Kafka.

 

I read the binary message that you send me with NiFi and loaded the schema in my local schema registry service (making sure it got assigned the right ID 34), and I was able to successfully convert the message from Avro to JSON using a ConvertRecord processor:

araujo_1-1662355234277.png

araujo_2-1662355261958.png

 

Cheers,

André

--
Was your question answered? Please take some time to click on "Accept as Solution" below this post.
If you find a reply useful, say thanks by clicking on the thumbs up button.

View solution in original post

16 REPLIES 16

avatar
Contributor

I tried sending it, but there seems to be no option to send a file. Can you please help me on that?
There is no insert option. Only the insert link is there. Sorry new to this website.  

avatar
Super Guru

@syntax_ ,

 

Please try running this command: xxd message.avro

Then you can copy and paste the output here.

 

Cheers,

André

 

--
Was your question answered? Please take some time to click on "Accept as Solution" below this post.
If you find a reply useful, say thanks by clicking on the thumbs up button.

avatar
Contributor

Here is the output:


00000000: 0000 0000 2264 4a6f 6220 7275 6e20 6174 ...."dJob run at
00000010: 2054 6875 2041 7567 2031 3820 6174 2030 Thu Aug 18 at 0
00000020: 343a 3136 206f 6e20 286e 6f64 6530 313a 4:16 on (node01:
00000030: 6e63 7075 733d 3129 c4f0 c491 d760 1a4a ncpus=1).....`.J
00000040: 6f62 2043 6f6d 706c 6574 6564 0a61 646d ob Completed.adm
00000050: 696e 1036 342e 6164 6d69 6e08 726f 6f74 in.64.admin.root
00000060: 0872 6f6f 740a .root.

avatar
Super Guru

Thanks, do you have the Avro schema for it?

--
Was your question answered? Please take some time to click on "Accept as Solution" below this post.
If you find a reply useful, say thanks by clicking on the thumbs up button.

avatar
Contributor

Yes. 

{
"name": "PbsJobData",
"type": "record",
"namespace": "pbsjob",
"fields": [
{
"name": "comment",
"type": "string",
"doc": "comment associated with the pbs job"
},
{
"name": "timestamp",
"type": "long",
"logicalType": "timestamp-millis",
"doc": "timestamp of scheduler metric"
},
{
"name": "job_state",
"type": "string",
"doc": "job status"
},
{
"name": "host",
"type": "string",
"doc": "Name of the PBS scheduler server"
},
{
"name": "job_id",
"type": "string",
"doc": "The job ID assigned by PBS"
},
{
"name": "job_user",
"type": "string",
"doc": "The job user"
},
{
"name": "job_group",
"type": "string",
"doc": "The job group"
}
]
}

avatar
Super Guru

@syntax_ ,

 

It seems to me that your source Kafka is a Confluent Kafka cluster and the producer uses schema registry to source the schema from it. In this case, the KafkaAvroSerializer prepends 5 bytes to every message produced to indicate the id of the schema that was used (in you case, schema id 34). If you try to read this message as a pure Avro payload the deserialization will fail because those 5 bytes are not part of the Avro payload.

 

So, the best way to handle this in NiFi is to also use Schema Registry to deserialize Avro messages. With this, NiFi will get the schema ID from the message 5-byte prefix, use that ID to retrieve the correct schema from Schema Registry and then correctly deserialize the Avro payload.

 

Considering that my guess is correct and you're using a Confluent Schema Registry, you should create a new ConfluentSchemaRegistry controller service and configure it with the details of your Schema Registry. Once this is done, edit the configuration of the Avro Reader controller service and set the following:

araujo_0-1662355081260.png

 

After you do this, your flow should be able to correctly process the messages that you're reading from Kafka.

 

I read the binary message that you send me with NiFi and loaded the schema in my local schema registry service (making sure it got assigned the right ID 34), and I was able to successfully convert the message from Avro to JSON using a ConvertRecord processor:

araujo_1-1662355234277.png

araujo_2-1662355261958.png

 

Cheers,

André

--
Was your question answered? Please take some time to click on "Accept as Solution" below this post.
If you find a reply useful, say thanks by clicking on the thumbs up button.

avatar
Contributor

Yes, it works. Thank you so much!!