Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Reduce fields in AvroReader to AvroWriter using ConvertRecord

avatar
Contributor

I used an Embedded Avro Schema for AvroReader (which has 20 fields), and UpdateAttribute, AvroWriter: "set schema name" + AvroSchemaRegistry with only binary data field and a few other identifying fields (so not the full schema anymore).

I was expecting this to reduce my 20 fields (in embedded schema) to 5 fields in Avro for multiple record.count flowfiles.

However, when this enters ConvertRecord with the AvroReader embedded and AvroRecordWriter with the Registry schema, it didn't convert it. It gave me an error saying "IllegalTypeConversionException: Cannot convert value [Ljava.lang.Object;@5cc957ab of type class [Ljava.lang.Object; because no compatible types exist in the UNION"

UpdateRecord with the same settings seems to work fine though but maybe because the schemas match closely? Can I not reduce the number of fields with ConvertRecord?

My main-goal: I'm trying to figure out the best way to create my attribute (or a new field by combining a few fields in each AvroRecord) for "HBase Identifier ID" so that I can then send it to PutHBaseJSON, since I don't think there is a PutHBaseRecord processor. I really really hope I won't have to slow things down by being forced to use SplitJSON and EvaluateJSONPath to get my custom ID as an attribute and then combine them (FileName+"-"+ID+"-"+Type) or something like that? There doesn't seem to be any other way to generate an ID for each record.count=10 flowfile.

The binary-only reduced schema below (+ a few other fields vs the bigger avro schema embedded):

{"type": "record","name": "files","namespace": "any.data","fields": [{"name":"ID","type":["null","string"]},{"name": "NAME","type": ["null", "string"]}, {"name": "TYPE","type": ["null", "string"]}, {"name": "ID2","type": ["null", "string"]},{"name":"BINARY","type":["null","bytes"]}

71485-converted-record-failure.png


converted-record-failure.png
1 ACCEPTED SOLUTION

avatar
Master Guru
@B X

We can reduce the fields by using convert record processor by using

Schema Access Strategy as Use Embedded Avro Schema in Record Reader and

Record Writer define your desired schema(only the required fields)

For renaming the fields and create unique field(used as row key in hbase) use Update Record processor.

Example:-

Here is my sample flow to reduce and create/rename fields

72546-flow.png

First 3 processors are used to generate avro data file with
Input Data:-

id,name,age,state1,foo,20,FL
2,foo2,30,TX
3,foo3,40,CA

Convertrecord processor:-
Record Reader
is Avro Reader and use embedded avro schema
Record Writer:-

72547-avrosetwriter.png

In writer i used Schema Access Strategy as use "schema text" property but you can use schema registry with schema name as access strategy.

Now as we have avro file with 4 fields but avro set writer is writing only id,state fields as output.

So the output flowfile content will have only 2 fields(id,state) now we have reduced the content of the input flowfile from avro reader to writer.

Ouptut flowfile will be in avro format to view the data i used AvroToJson processor

[{
 "id": "1",
 "state": "FL"
},
{
 "id": "2",
 "state": "TX"
},
{
 "id": "3",
 "state": "CA"
}]

Update Record:-

We are going to create/rename the fields in this processor

72548-updaterecord.png

Record Reader use as embedded avro schema

Record Writer:-
1.Creating a field

/row_id

concat( /id, '${filename}' , /state ,'${UUID()}') 

in the above value we are concatinating id,state(from record path) filename,UUID(associated with file) by doing this you will get unique id and use as hbase row key also.
2.Changing the field names

To change the fields we need to swap them to new desired names

/rename_id

/id

/rename_state

/state

get the id,state values from record path and assign to rename_id,rename_state fields

Avro Setwriter configs:-

We are adding new fields and renaming existing so change the writer schema

72549-updaterecord-avrosetwriter.png

Ouptut flowfile will be in avro format to view the data i used AvroToJson processor and the new content of the flowfile will have our new/renamed fileds.

[ {
  "row_id" : "1194346641658863.avroFL0d7a8c16-d986-4329-8c02-02662db1cb98",
  "rename_id" : "1",
  "rename_state" : "FL"
}, {
  "row_id" : "2194346641658863.avroTX1db33ffe-142c-4afa-aba4-e1583b1d4d91",
  "rename_id" : "2",
  "rename_state" : "TX"
}, {
  "row_id" : "3194346641658863.avroCAfe096132-525b-4f72-939d-324f2e62407d",
  "rename_id" : "3",
  "rename_state" : "CA"
} ]

i have attached my flow.xml to understand easily save/upload template to your instance.

188134-reduce-and-create-rename-fields.xml

View solution in original post

6 REPLIES 6

avatar
Contributor

Just an example Avro file that can be used to test a schema. It looks like you can't just convertRecord if you're missing fields from your schema (ConvertRecord doesn't reduce the amount of fields as expected, it instead likes to fail and say "incompatible schemas" essentially).

test-avro-file.txt

avatar
Contributor

ok sure I replied with a sample file. More schema fields embedded than the schema above.

avatar
Contributor

@Shu

I am not seeing the advantage of having the sample files in JSON/csv format, I can manipulate those myself. The problem I have is reducing avros or manipulating them with Record processors without converting to JSON first. I'm trying to avoid having to do any Splits/SplitJSON and EvaluateJsonPath which is a nice way to create attributes or manipulate flowfile-content.

Main problems include:

  • Converting Avro parameters with record.count > 1 to attributes without splitting.
  • Removing fields from an avro flowfile-content.
  • Renaming fields from an avro flowfile-content (not too important).
  • Converting an embedded Avro format to a different AvroSchemaRegistry format (reducing fields essentially). (the topic of this post).

avatar
Master Guru
@B X

We can reduce the fields by using convert record processor by using

Schema Access Strategy as Use Embedded Avro Schema in Record Reader and

Record Writer define your desired schema(only the required fields)

For renaming the fields and create unique field(used as row key in hbase) use Update Record processor.

Example:-

Here is my sample flow to reduce and create/rename fields

72546-flow.png

First 3 processors are used to generate avro data file with
Input Data:-

id,name,age,state1,foo,20,FL
2,foo2,30,TX
3,foo3,40,CA

Convertrecord processor:-
Record Reader
is Avro Reader and use embedded avro schema
Record Writer:-

72547-avrosetwriter.png

In writer i used Schema Access Strategy as use "schema text" property but you can use schema registry with schema name as access strategy.

Now as we have avro file with 4 fields but avro set writer is writing only id,state fields as output.

So the output flowfile content will have only 2 fields(id,state) now we have reduced the content of the input flowfile from avro reader to writer.

Ouptut flowfile will be in avro format to view the data i used AvroToJson processor

[{
 "id": "1",
 "state": "FL"
},
{
 "id": "2",
 "state": "TX"
},
{
 "id": "3",
 "state": "CA"
}]

Update Record:-

We are going to create/rename the fields in this processor

72548-updaterecord.png

Record Reader use as embedded avro schema

Record Writer:-
1.Creating a field

/row_id

concat( /id, '${filename}' , /state ,'${UUID()}') 

in the above value we are concatinating id,state(from record path) filename,UUID(associated with file) by doing this you will get unique id and use as hbase row key also.
2.Changing the field names

To change the fields we need to swap them to new desired names

/rename_id

/id

/rename_state

/state

get the id,state values from record path and assign to rename_id,rename_state fields

Avro Setwriter configs:-

We are adding new fields and renaming existing so change the writer schema

72549-updaterecord-avrosetwriter.png

Ouptut flowfile will be in avro format to view the data i used AvroToJson processor and the new content of the flowfile will have our new/renamed fileds.

[ {
  "row_id" : "1194346641658863.avroFL0d7a8c16-d986-4329-8c02-02662db1cb98",
  "rename_id" : "1",
  "rename_state" : "FL"
}, {
  "row_id" : "2194346641658863.avroTX1db33ffe-142c-4afa-aba4-e1583b1d4d91",
  "rename_id" : "2",
  "rename_state" : "TX"
}, {
  "row_id" : "3194346641658863.avroCAfe096132-525b-4f72-939d-324f2e62407d",
  "rename_id" : "3",
  "rename_state" : "CA"
} ]

i have attached my flow.xml to understand easily save/upload template to your instance.

188134-reduce-and-create-rename-fields.xml

avatar
Contributor

This is pretty amazing. It deserves to be on a blog of some kind 🙂

avatar
Master Guru
@B X

For reducing number of fields and renaming the fields we won't need to use Convert Record processor also, we can acheive by using one UpdateRecord processor, as update record processor expects to add atleast one user-defined properties(like swapping field name...) once we add the one property then we can do reduce or rename the fields.
Please see this article as i'm reducing and renaming fields in first update record processor.
if you are thinking to just reduce number of fields and not changing any contents then we need to use ConvertRecord processor.