Created 03-13-2019 09:48 PM
Hi at all,
I have a JSON in my FlowFile content like this
{ "headers": { "key_1":"value_1", ...... "key_n":"value_n"}, "info": { "key_1":"value_1", ...... "key_n":"value_n"}, "payload": { "key_1":"value_1", ...... "key_n":"value_n"} }
my aim in the flow is:
I've tried to implement the bullets 1 with EvaluateJSONPath processor and 2.1, 2.2, 3 bullets with AttributesToJSON processor but I've got this problem:
in AttributesToJSON if I use flow-file content as Destination the merge works but it writes headers, info and payload values like unique values instead of other key-value json object. Here an example:
{ "headers": "{\"key_1\":\"value_1\", ...... \"key_n\":\"value_n\"}]}}}", "info": "{\"key_1\":\"value_1\", ...... \"key_n\":\"value_n\"}]}}}", "payload": "{\"key_1\":\"value_1\", ...... \"key_n\":\"value_n\"}]}}}", "kafka.partition": "1" }
Any ideas?
Thank you!
Created 03-21-2019 12:55 PM
Instead of extracting the content into attributes (which will effectively turn the JSON objects into Strings), you should be able to add the core attributes using UpdateRecord or JoltTransformJSON/JoltTransformRecord. For the record processors, it looks like you could use the following schema for the reader:
{ "namespace": "nifi", "name": "myRecord", "type": "record", "fields": [ {"name": "headers","type": {"type": "map","values": "string"}}, {"name": "info","type": {"type": "map","values": "string"}}, {"name": "payload","type": {"type": "map","values": "string"}} ] }
For the subflow to add core attributes, you'd want the writer schema to include the additional fields such as kafka.partition, then in UpdateRecord for example, you could add a user-defined property /kafka.partition with value 1 (and Replacement Strategy "Literal Value"). If in the core attribute subflow you want to remove the payload field, you can just remove it from the writer schema, and it won't be included in the output. For example:
{ "namespace": "nifi", "name": "myRecord", "type": "record", "fields": [ {"name": "headers","type": {"type": "map","values": "string"}}, {"name": "info","type": {"type": "map","values": "string"}}, {"name": "uuid","type": "string"}, {"name": "kafka.topic","type": "string"}, {"name": "kafka.partition","type": "string"}, {"name": "kafka.offset","type": "long"} ] }
If I understand correctly, your headers+info+payload content would remain the same as the original. If instead you are trying to collapse all the key-value pairs from headers, info, etc. and possibly add core attributes, then JoltTransformJSON is probably the best choice. If that's what you meant please let me know and I'll help with the Jolt spec to do the transformation.
Created 03-21-2019 12:55 PM
Instead of extracting the content into attributes (which will effectively turn the JSON objects into Strings), you should be able to add the core attributes using UpdateRecord or JoltTransformJSON/JoltTransformRecord. For the record processors, it looks like you could use the following schema for the reader:
{ "namespace": "nifi", "name": "myRecord", "type": "record", "fields": [ {"name": "headers","type": {"type": "map","values": "string"}}, {"name": "info","type": {"type": "map","values": "string"}}, {"name": "payload","type": {"type": "map","values": "string"}} ] }
For the subflow to add core attributes, you'd want the writer schema to include the additional fields such as kafka.partition, then in UpdateRecord for example, you could add a user-defined property /kafka.partition with value 1 (and Replacement Strategy "Literal Value"). If in the core attribute subflow you want to remove the payload field, you can just remove it from the writer schema, and it won't be included in the output. For example:
{ "namespace": "nifi", "name": "myRecord", "type": "record", "fields": [ {"name": "headers","type": {"type": "map","values": "string"}}, {"name": "info","type": {"type": "map","values": "string"}}, {"name": "uuid","type": "string"}, {"name": "kafka.topic","type": "string"}, {"name": "kafka.partition","type": "string"}, {"name": "kafka.offset","type": "long"} ] }
If I understand correctly, your headers+info+payload content would remain the same as the original. If instead you are trying to collapse all the key-value pairs from headers, info, etc. and possibly add core attributes, then JoltTransformJSON is probably the best choice. If that's what you meant please let me know and I'll help with the Jolt spec to do the transformation.
Created on 03-21-2019 03:56 PM - edited 08-17-2019 04:47 PM
Thanks @Matt. Actually Jolt Transformation is the the solution worked for me (reached few days ago, I'm looking for time to write my answer 🙂 ).
In my flow I route on an attribute if I want the payload or not and then I apply this Jolt spec:
[{ "operation": "shift", "spec": { "headers": "headers", "info": "info", "payLoad": "payLoad" } }, { "operation": "default", "spec": { "_kafka": { "offset": "${kafka.offset}", "partition": "${kafka.partition}", "topic": "${kafka.topic}", "key": "${kafka.key}" }, "_nifi": { "flowfileuuid": "${uuid}" } } }
]
Thanks to confirm this is the right way: I'm a newer to NiFi.