Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Nifi Avro Binary Bytes reading

avatar
Contributor

Hey guys,

I have avro files with certain fields embedded inside the avro with field "avro.schema" inside the file content.

They have some metadata fields and a field called BINARY which has the "type": "bytes" format.

Now, when I GetFile, I am using ConvertRecord to AvroReader to EvaluateJson and sending the "BINARY" to "flowfile-content" and the rest of the metadata fields to "flowfile-attributes" (with a different EvaluateJson).

Is there a better way I can do this? The binary field is being auto-converted to a JSON array: [12,241,23,55,-33,-44....]

However, I want the bytes to put into a column in HBase (and also to be able to press "List Queue" and click on a flowfile and "Download" to see the file).

Of course the AvroReader to JsonRecordSetWriter has converted the binary to JSON array format. So now I don't know how to convert it back.

Also I haven't found anything similar to "EvaluateJson" for AVRO formats. Is this by design? Am I missing a processor I can't find? ExtractAvroMetadata processor only seems to be used for extracting the schema not any other field, nothing else seems to work.

Anyone have a link to someone parsing Avro files with a binary/bytes field that corresponds to an actual file?

1 ACCEPTED SOLUTION

avatar
Master Guru

You can bulk manipulate records using UpdateRecord and Record Path:

https://nifi.apache.org/docs/nifi-docs/html/record-path-guide.html

https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.6.0/org.apache...

Changing the name of the field is a little tricky because you are reading based on a schema, so the field name is part of the schema, but it can be done if you define both field names in the schema and have the second field default to null, then you can move the value from the first field to the second field.

Regarding splitting a record in the binary and metadata...

One thing to understand is that even though your Avro data file has a schema embedded in it, you don't have to use this schema for reading/writing in NiFi.

Imagine this scenario...

Embedded schema:

{
"name": "foo",
"namespace": "nifi",
"type": "record",
"fields": [
{ "name": "field1", "type": "string" },
{ "name": "field2", "type": "bytes" }
]
}

You could define two other schemas and put them in NiFi's schema registry...

Metadata only:

{
"name": "foo",
"namespace": "nifi",
"type": "record",
"fields": [
{ "name": "field1", "type": "string" }
]
}

Binary Only:

{
"name": "foo",
"namespace": "nifi",
"type": "record",
"fields": [
{ "name": "field2", "type": "bytes" }
]
}

Then you can have a ConvertRecord that has the AvroReader using embedded schema, and the AvroWriter using a schema from the schema registry. So on the way out it would rewrite the Avro with only the fields from the writer schema.

So if you sent your original data to two different ConvertRecord processors, one for metadata only and one for binary only, then can essentially select out the parts for each one.


ConvertRecord is preferred over other convert processors. How the schema is accessed is based on Schema Access Strategy in the services. If a record reader says "Schema By Name" then it requires a schema registry to look up by name. If the Avro reader has "Embedded Schema" select then you shouldn't need to use a schema registry because it will use the schema embedded in the Avro file.

View solution in original post

4 REPLIES 4

avatar
Master Guru

The main reason that there is no "EvaluateRecordPath" is because it only really makes sense when there is one record per flow file, and major goal of NiFi's record processing is to have flow files with many, many records in them. It would still be valuable to have a processor like this, but it wasn't a primary use case so hasn't be developed yet.

The conversion that is happening with the JSON array is the correct behavior for how NiFi's record handling works... the Avro type of "bytes" is treated as an "array of byte", and since JSON only really has string and numeric types, all it can really do is write an array of ints (bytes).

Unfortunately I think the best route for your would be a custom processor that reads the content of a flow file using Avro's Java library, then gets the byte[] of the binary field and writes it to the flow file's output stream to replace the content, and add the other fields as attributes.

Lots of examples in NiFi code of how to read an Avro datafile, here is from ConvertAvroToJson processor:

https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/sr...

avatar
Contributor

I see. That does make sense. For example I had to SplitJson in order to access individual records and EvaluateJsonPath. Thank you so much for these details it makes a lot more sense now.

However, is there no processor to manipulate multiple record fields? As in, say record.count=10 in the avro, and I want to say change name of the fields: "field_name_old[\*]" to "new-field-name[\*]"...

I feel like it would be really amazing to have a processor that can bulk-manipulate flowfile avros (with multi record counts inside).

Or instead of renaming ALL fields in an avro-multi-record-flowfile, let's say I want to add a field to the avro records (into the avro format). e.g. "ingestion-date" = "now()" and let's say SHA256 Hashvalue, before I insert it into NoSQL database. But if I add new attributes, then I have to somehow combine those attributes with the original avro or converted-JSON format.

My main option is NOT to convert it to JSON, maybe keep it as Avro, if only I could figure out an easy way with Nifi to manipulate it a bit before inserting it.

I guess my main problem is, I want the binary to be put in a binary-friendly NoSQL like HBase, and I want the metadata stored in a different search engine NoSQL database like Elastic or Solr. In order to do that, I need to be able to remove or split into two flowfiles the records with binary/bytes and the records with metadata.

One other solution maybe is to just convert it to JsonRecord and stuff it into Hbase using JsonToHbase, and then later pull it out for other manipulation into other databases ? (MongoDB, Solr, Elastic)?

Two final questions:

- Do I really ever need ConvertAvroToJSON when I have ConvertRecord?
- I seem to have trouble if I don't always define an avroSchemaRegistry and cut and paste one of the avro.schema's inside the avro file into my registry. If I try to extractschema or something, that's fine, but you can't just auto-insert it into schema, so it becomes problematic trying to use EmbeddedAvroSchema option. I haven't found an easy way to just use the schema in the file automatically and convert it to different types (obviously if you convert it to JSON, you lose the schema without a schemaregistry.... or does nifi have a way of dynamically inserting new types into avroschemaregistry controller?).

avatar
Master Guru

You can bulk manipulate records using UpdateRecord and Record Path:

https://nifi.apache.org/docs/nifi-docs/html/record-path-guide.html

https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.6.0/org.apache...

Changing the name of the field is a little tricky because you are reading based on a schema, so the field name is part of the schema, but it can be done if you define both field names in the schema and have the second field default to null, then you can move the value from the first field to the second field.

Regarding splitting a record in the binary and metadata...

One thing to understand is that even though your Avro data file has a schema embedded in it, you don't have to use this schema for reading/writing in NiFi.

Imagine this scenario...

Embedded schema:

{
"name": "foo",
"namespace": "nifi",
"type": "record",
"fields": [
{ "name": "field1", "type": "string" },
{ "name": "field2", "type": "bytes" }
]
}

You could define two other schemas and put them in NiFi's schema registry...

Metadata only:

{
"name": "foo",
"namespace": "nifi",
"type": "record",
"fields": [
{ "name": "field1", "type": "string" }
]
}

Binary Only:

{
"name": "foo",
"namespace": "nifi",
"type": "record",
"fields": [
{ "name": "field2", "type": "bytes" }
]
}

Then you can have a ConvertRecord that has the AvroReader using embedded schema, and the AvroWriter using a schema from the schema registry. So on the way out it would rewrite the Avro with only the fields from the writer schema.

So if you sent your original data to two different ConvertRecord processors, one for metadata only and one for binary only, then can essentially select out the parts for each one.


ConvertRecord is preferred over other convert processors. How the schema is accessed is based on Schema Access Strategy in the services. If a record reader says "Schema By Name" then it requires a schema registry to look up by name. If the Avro reader has "Embedded Schema" select then you shouldn't need to use a schema registry because it will use the schema embedded in the Avro file.

avatar
Contributor

Thank you for this. I'm going to give this a shot. Everything sounds exactly like it would work great so I'll build it out next week.