Created 09-17-2017 09:02 AM
I am in a scenario where others define & change the schema on a regular basis and the data pipeline needs to pass through the data. Our developers are using a message envelope with some common fields, and then an array of individual messages. So in my case I might have something like:
{ "user_id": 123, "other_root_field": "blah", "parent": { "events": [ { "nested_1": "a", "nested_2": "b" }, { "nested_3": "c", "nested_1": "d" } ] } }
What I want to do is pull out all the individual events, add the data from the envelope and write them to Kafka (still in JSON format). It seems like I should use the JoltTransformJSON processor, followed by a SplitJSON process & finally a KafkaProducer (please correct me if there is a better way).
The first event from the example above would look like:
{"user_id":123,"other_root_field":"blah","exploded_nested_1":"a","exploded_nested_2":"b"}
Note that the fields from the array have an "exploded_" prefix added - this is to avoid name collision between any fields defined on the envelope and those in the individual events.
To get there it seems like I should produce this from Jolt:
[ { "user_id": 123, "other_root_field": "blah", "exploded_nested_1": "a", "exploded_nested_2": "b" }, { "user_id": 123, "other_root_field": "blah", "exploded_nested_3": "c", "exploded_nested_1": "d" } ]
I can't seem to quite get there however:
1. I can't get Jolt to add the prefix to the fields in the array.
[{"operation":"shift","spec":{"parent":{"events":{"*":{"@":"[exploded_&]"}}}}}]
This gives me an error that exploded_& is an invalid index for the array. Using just [&] will output the existing field names though.
2. I can't figure out how to include fields on the root, but exclude the "parent" that holds the array.
[{"operation":"shift","spec":{"parent":{"events":{"*":{"@3":"[&]"}}}}}]
Will get me an array entry for every event with all data in each one - I need a way to say all events on the root except "parent".
Help would be greatly appreciated.
Thanks,
--Ben
Created 09-17-2017 02:41 PM
Hi @Ben Vogan,
you can make use of below jolt specification
[ { "operation": "shift", "spec": { "parent": { "events": { "*": { "@(3,user_id)": "events[&1].user_id", "@(4,other_root_field)": "events[&1].other_root_field", "nested_1": "events[&1].exploded_nested_1", "nested_2": "events[&1].exploded_nested_2", "nested_3": "events[&1].exploded_nested_3" } } } } } ]
input:-
{ "user_id": 123, "other_root_field": "blah", "parent": { "events": [ { "nested_1": "a", "nested_2": "b" }, { "nested_3": "c", "nested_1": "d" } ] } }
output:-
{ "events" : [ { "user_id" : 123, "other_root_field" : "blah", "exploded_nested_1" : "a", "exploded_nested_2" : "b" }, { "user_id" : 123, "other_root_field" : "blah", "exploded_nested_1" : "d", "exploded_nested_3" : "c" } ] }
Created 09-17-2017 03:08 PM
@Yash thanks for your reply. However my problem is that I do not know the set of fields (either on the root, or inside the array elements) - it is always changing and I don't want to have to update the spec every time someone adds a field. The spec should only know about parent.events and not assume the existence of any other field. I need a way to say "copy everything at the root, except for the parent field."
What I've done for the moment is just implemented the logic in Jython - although it is fairly slow.