Support Questions

Find answers, ask questions, and share your expertise

NiFi: Explode a JSON array while keeping root level fields

avatar
New Contributor

I am in a scenario where others define & change the schema on a regular basis and the data pipeline needs to pass through the data. Our developers are using a message envelope with some common fields, and then an array of individual messages. So in my case I might have something like:

{
  "user_id": 123,
  "other_root_field": "blah",
  "parent": {
    "events": [
      {
        "nested_1": "a",
        "nested_2": "b"
      },
      {
        "nested_3": "c",
        "nested_1": "d"
      }
    ]
  }
}

What I want to do is pull out all the individual events, add the data from the envelope and write them to Kafka (still in JSON format). It seems like I should use the JoltTransformJSON processor, followed by a SplitJSON process & finally a KafkaProducer (please correct me if there is a better way).

The first event from the example above would look like:

{"user_id":123,"other_root_field":"blah","exploded_nested_1":"a","exploded_nested_2":"b"}

Note that the fields from the array have an "exploded_" prefix added - this is to avoid name collision between any fields defined on the envelope and those in the individual events.

To get there it seems like I should produce this from Jolt:

[
  {
    "user_id": 123,
    "other_root_field": "blah",
    "exploded_nested_1": "a",
    "exploded_nested_2": "b"
  },
  {
    "user_id": 123,
    "other_root_field": "blah",
    "exploded_nested_3": "c",
    "exploded_nested_1": "d"
  }
]

I can't seem to quite get there however:

1. I can't get Jolt to add the prefix to the fields in the array.

[{"operation":"shift","spec":{"parent":{"events":{"*":{"@":"[exploded_&]"}}}}}]

This gives me an error that exploded_& is an invalid index for the array. Using just [&] will output the existing field names though.

2. I can't figure out how to include fields on the root, but exclude the "parent" that holds the array.

[{"operation":"shift","spec":{"parent":{"events":{"*":{"@3":"[&]"}}}}}]

Will get me an array entry for every event with all data in each one - I need a way to say all events on the root except "parent".

Help would be greatly appreciated.

Thanks,

--Ben

2 REPLIES 2

avatar
Master Guru

Hi @Ben Vogan,

you can make use of below jolt specification

[
  {
    "operation": "shift",
    "spec": {
      "parent": {
        "events": {
          "*": {
            "@(3,user_id)": "events[&1].user_id",
            "@(4,other_root_field)": "events[&1].other_root_field",
            "nested_1": "events[&1].exploded_nested_1",
            "nested_2": "events[&1].exploded_nested_2",
            "nested_3": "events[&1].exploded_nested_3"
          }
        }
      }
    }
  }
  ]

input:-

{
  "user_id": 123,
  "other_root_field": "blah",
  "parent": {
    "events": [
      {
        "nested_1": "a",
        "nested_2": "b"
      },
      {
        "nested_3": "c",
        "nested_1": "d"
      }
    ]
  }
}

output:-

{
  "events" : [ {
    "user_id" : 123,
    "other_root_field" : "blah",
    "exploded_nested_1" : "a",
    "exploded_nested_2" : "b"
  }, {
    "user_id" : 123,
    "other_root_field" : "blah",
    "exploded_nested_1" : "d",
    "exploded_nested_3" : "c"
  } ]
}

jolt.png

avatar
New Contributor

@Yash thanks for your reply. However my problem is that I do not know the set of fields (either on the root, or inside the array elements) - it is always changing and I don't want to have to update the spec every time someone adds a field. The spec should only know about parent.events and not assume the existence of any other field. I need a way to say "copy everything at the root, except for the parent field."

What I've done for the moment is just implemented the logic in Jython - although it is fairly slow.