- Subscribe to RSS Feed
- Mark Question as New
- Mark Question as Read
- Float this Question for Current User
- Bookmark
- Subscribe
- Mute
- Printer Friendly Page
Read binary avro data from Kafka using Nifi
- Labels:
-
Apache NiFi
Created ‎08-31-2022 11:27 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
I want to be able to read binary Avro data from Kafka using ConsumeKafka Processor. And I am able to do that, but the content type of the file is in "application/octet-stream". Not able to view it as well, it says "No viewer is registered for this content type". I am not even able to convert this avro data to json, since the content type is octet-stream. But when i use "kafka-avro-console-consumer" on the console the data is in json. How to get this json data into Nifi?
Created ‎09-04-2022 10:21 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
@syntax_ ,
It seems to me that your source Kafka is a Confluent Kafka cluster and the producer uses schema registry to source the schema from it. In this case, the KafkaAvroSerializer prepends 5 bytes to every message produced to indicate the id of the schema that was used (in you case, schema id 34). If you try to read this message as a pure Avro payload the deserialization will fail because those 5 bytes are not part of the Avro payload.
So, the best way to handle this in NiFi is to also use Schema Registry to deserialize Avro messages. With this, NiFi will get the schema ID from the message 5-byte prefix, use that ID to retrieve the correct schema from Schema Registry and then correctly deserialize the Avro payload.
Considering that my guess is correct and you're using a Confluent Schema Registry, you should create a new ConfluentSchemaRegistry controller service and configure it with the details of your Schema Registry. Once this is done, edit the configuration of the Avro Reader controller service and set the following:
After you do this, your flow should be able to correctly process the messages that you're reading from Kafka.
I read the binary message that you send me with NiFi and loaded the schema in my local schema registry service (making sure it got assigned the right ID 34), and I was able to successfully convert the message from Avro to JSON using a ConvertRecord processor:
Cheers,
André
Was your question answered? Please take some time to click on "Accept as Solution" below this post.
If you find a reply useful, say thanks by clicking on the thumbs up button.
Created ‎08-31-2022 06:32 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
@syntax_ ,
I believe you have a schema that you can use to parse your Avro data, right?
Instead of using ConsumeKafka, use the ConsumeKafkaRecord processor. In that processor specify an Record Reader of type AvroReader and provide the correct schema so that the reader can properly deserialize your data.
If you want to convert the data for JSON, you can then specify a JsonRecordSetWriter as the Record Writer for that processor, so that the output flowfiles will be in that format and you'll be able to inspect the content of the queues.
Cheers,
André
Was your question answered? Please take some time to click on "Accept as Solution" below this post.
If you find a reply useful, say thanks by clicking on the thumbs up button.
Created ‎08-31-2022 09:22 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
I used the ConsumeKafkaRecord processor as well. I gave RecordReader as Avro reader. But when I run it, it gives an error saying "invalidMagicException: Not an Avro data file". Is there something I am missing here?
Created ‎09-01-2022 02:25 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Would you be able to save one of these files in a file and share it with me?
Was your question answered? Please take some time to click on "Accept as Solution" below this post.
If you find a reply useful, say thanks by clicking on the thumbs up button.
Created ‎09-01-2022 03:16 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Yes sure, do you need the data that i am sending to Nifi or the data that has been processed by nifi.
Created ‎09-01-2022 03:25 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
If you could provide both it would help.
Was your question answered? Please take some time to click on "Accept as Solution" below this post.
If you find a reply useful, say thanks by clicking on the thumbs up button.
Created ‎09-01-2022 04:04 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Here is the output for "kafka-avro-console-consumer --from-beginning --bootstrap-server admin:9092 --topic pbs_jobs"
{"comment":"Job run at Thu Aug 18 at 04:21 on (node01:ncpus=1)","timestamp":1660963434048,"job_state":"Job Completed","host":"admin","job_id":"65.admin","job_user":"root","job_group":"root"}
{"comment":"Job run at Thu Aug 18 at 04:22 on (node01:ncpus=1)","timestamp":1660963434048,"job_state":"Job Completed","host":"admin","job_id":"66.admin","job_user":"root","job_group":"root"}
{"comment":"Job run at Thu Aug 18 at 04:22 on (node01:ncpus=1)","timestamp":1660963434048,"job_state":"Job Completed","host":"admin","job_id":"67.admin","job_user":"root","job_group":"root"}
{"comment":"Job run at Thu Aug 18 at 04:23 on (node02:ncpus=10)","timestamp":1660963434048,"job_state":"Job Completed","host":"admin","job_id":"68.admin","job_user":"root","job_group":"root"}
{"comment":"Not Running: Insufficient amount of resource: ncpus (R: 10 A: 1 T: 16)","timestamp":1660963434048,"job_state":"Job Queued","host":"admin","job_id":"69.admin","job_user":"root","job_group":"root"}
{"comment":"Job run at Thu Aug 18 at 07:13 on (node02:ncpus=1)","timestamp":1660963494042,"job_state":"Job Completed","host":"admin","job_id":"70.admin","job_user":"root","job_group":"root"}
{"comment":"Job run at Thu Aug 18 at 04:16 on (node01:ncpus=1)","timestamp":1660963494042,"job_state":"Job Completed","host":"admin","job_id":"64.admin","job_user":"root","job_group":"root"}
{"comment":"Job run at Thu Aug 18 at 04:21 on (node01:ncpus=1)","timestamp":1660963494042,"job_state":"Job Completed","host":"admin","job_id":"65.admin","job_user":"root","job_group":"root"}
{"comment":"Job run at Thu Aug 18 at 04:22 on (node01:ncpus=1)","timestamp":1660963494042,"job_state":"Job Completed","host":"admin","job_id":"66.admin","job_user":"root","job_group":"root"}
{"comment":"Job run at Thu Aug 18 at 04:22 on (node01:ncpus=1)","timestamp":1660963494042,"job_state":"Job Completed","host":"admin","job_id":"67.admin","job_user":"root","job_group":"root"}
{"comment":"Job run at Thu Aug 18 at 04:23 on (node02:ncpus=10)","timestamp":1660963494042,"job_state":"Job Completed","host":"admin","job_id":"68.admin","job_user":"root","job_group":"root"}
But when I send this data to nifi using ConsumeKafkaRecord:
Getting this as output:
Created ‎09-01-2022 10:09 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
I actually wanted to have a look at the binary Avro data that is in Kafka, not the deserialized content.
Something like this:
kafka-console-consumer --from-beginning --bootstrap-server admin:9092 --topic pbs_jobs --max-messages 1 > message.avro
Cheers,
André
Was your question answered? Please take some time to click on "Accept as Solution" below this post.
If you find a reply useful, say thanks by clicking on the thumbs up button.
Created ‎09-01-2022 11:47 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Okay, this is the output.
It is in this format
"dJob run at Thu Aug 18 at 04:16 on (node01:ncpus=1)â–’â–’Ä‘â–’`Job Completed
admin64.admirooroot
Created ‎09-02-2022 03:16 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Can you please send me that file in a private message. Copy and paste won't work 🙂
Cheers,
André
Was your question answered? Please take some time to click on "Accept as Solution" below this post.
If you find a reply useful, say thanks by clicking on the thumbs up button.
