Using NiFi 1.3.0, I'm trying to build an adapter that streams records from a database table to Kafka, while converting it to our internal data model standards. I use the QueryDatabaseTable processor to get records that are automatically converted to a flat Avro schema. Now I want to convert fields to a nested structure, e.g. when I get two string fields 'firstname' and 'lastname' from the database table, I want to convert it to a schema where there is a (new) record called Customer that has these two fields.
I have tried using the UpdateRecord processor for this, using a reader for the embedded Avro schema in the query results, and an Avro record set writer with my target schema. I'm setting the replacement strategy of the processor to 'Record path value', and adding properties like this: '/Customer/firstname' => 'firstname', '/Customer/lastname' => 'lastname'. However the resulting file has a record 'Customer' that is null, and a nullpointer exception is thrown (I suspect because my target schema does not allow the Customer record to be null).
This leads me to believe that the UpdateRecord processor cannot be used to create new nested records. My question is: is that correct? If so, what would be the right way to go about this transformation? If not, what am I doing wrong in my approach?
Any thoughts are greatly appreciated.
We're now going to use the ExecuteScript processor to accomplish this mapping using Python, which seems more than able. Using this post as inspiration/boilerplate: https://community.hortonworks.com/articles/35568/python-script-in-nifi.html
However, it does not feel like the 'proper' NiFi way.