Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Split JSON flow file and recompose to new flow content

avatar
New Contributor

Hi at all,


I have a JSON in my FlowFile content like this

{   
  "headers": {
                "key_1":"value_1",
                 ......               
                "key_n":"value_n"},  
  "info": {                "key_1":"value_1",
                 ......               
                "key_n":"value_n"}, 
  "payload": {                
                "key_1":"value_1",
                 ......               
                "key_n":"value_n"}
} 



my aim in the flow is:

  1. split the original JSON into three attributes (headers, payload and info)
  2. route on a global variable into two subflows
    1. merge headers+info+core attributes
    2. merge headers+info+payload attributes
  3. add core attributes to new JSON content (like uuid, kafka topic, kafka partition, kafka offset)
  4. send the new json to splunk with HttpInvoke POST

I've tried to implement the bullets 1 with EvaluateJSONPath processor and 2.1, 2.2, 3 bullets with AttributesToJSON processor but I've got this problem:

in AttributesToJSON if I use flow-file content as Destination the merge works but it writes headers, info and payload values like unique values instead of other key-value json object. Here an example:

{
  "headers": "{\"key_1\":\"value_1\",
               ......
               \"key_n\":\"value_n\"}]}}}",  
"info": "{\"key_1\":\"value_1\",
               ......
            \"key_n\":\"value_n\"}]}}}",   
"payload": "{\"key_1\":\"value_1\",
               ......
            \"key_n\":\"value_n\"}]}}}",
"kafka.partition": "1"
}


Any ideas?


Thank you!

1 ACCEPTED SOLUTION

avatar
Master Guru

Instead of extracting the content into attributes (which will effectively turn the JSON objects into Strings), you should be able to add the core attributes using UpdateRecord or JoltTransformJSON/JoltTransformRecord. For the record processors, it looks like you could use the following schema for the reader:

{
 "namespace": "nifi",
 "name": "myRecord",
 "type": "record",
 "fields": [
  {"name": "headers","type": {"type": "map","values": "string"}},
  {"name": "info","type": {"type": "map","values": "string"}},
  {"name": "payload","type": {"type": "map","values": "string"}}
 ]
}

For the subflow to add core attributes, you'd want the writer schema to include the additional fields such as kafka.partition, then in UpdateRecord for example, you could add a user-defined property /kafka.partition with value 1 (and Replacement Strategy "Literal Value"). If in the core attribute subflow you want to remove the payload field, you can just remove it from the writer schema, and it won't be included in the output. For example:

{ 
"namespace": "nifi", 
  "name": "myRecord", 
  "type": "record", 
  "fields": [
   {"name": "headers","type": {"type": "map","values": "string"}},  
   {"name": "info","type": {"type": "map","values": "string"}},  
   {"name": "uuid","type": "string"},
   {"name": "kafka.topic","type": "string"},
   {"name": "kafka.partition","type": "string"},
   {"name": "kafka.offset","type": "long"}
 ] 
}



If I understand correctly, your headers+info+payload content would remain the same as the original. If instead you are trying to collapse all the key-value pairs from headers, info, etc. and possibly add core attributes, then JoltTransformJSON is probably the best choice. If that's what you meant please let me know and I'll help with the Jolt spec to do the transformation.

View solution in original post

2 REPLIES 2

avatar
Master Guru

Instead of extracting the content into attributes (which will effectively turn the JSON objects into Strings), you should be able to add the core attributes using UpdateRecord or JoltTransformJSON/JoltTransformRecord. For the record processors, it looks like you could use the following schema for the reader:

{
 "namespace": "nifi",
 "name": "myRecord",
 "type": "record",
 "fields": [
  {"name": "headers","type": {"type": "map","values": "string"}},
  {"name": "info","type": {"type": "map","values": "string"}},
  {"name": "payload","type": {"type": "map","values": "string"}}
 ]
}

For the subflow to add core attributes, you'd want the writer schema to include the additional fields such as kafka.partition, then in UpdateRecord for example, you could add a user-defined property /kafka.partition with value 1 (and Replacement Strategy "Literal Value"). If in the core attribute subflow you want to remove the payload field, you can just remove it from the writer schema, and it won't be included in the output. For example:

{ 
"namespace": "nifi", 
  "name": "myRecord", 
  "type": "record", 
  "fields": [
   {"name": "headers","type": {"type": "map","values": "string"}},  
   {"name": "info","type": {"type": "map","values": "string"}},  
   {"name": "uuid","type": "string"},
   {"name": "kafka.topic","type": "string"},
   {"name": "kafka.partition","type": "string"},
   {"name": "kafka.offset","type": "long"}
 ] 
}



If I understand correctly, your headers+info+payload content would remain the same as the original. If instead you are trying to collapse all the key-value pairs from headers, info, etc. and possibly add core attributes, then JoltTransformJSON is probably the best choice. If that's what you meant please let me know and I'll help with the Jolt spec to do the transformation.

avatar
New Contributor

Thanks @Matt. Actually Jolt Transformation is the the solution worked for me (reached few days ago, I'm looking for time to write my answer 🙂 ).


In my flow I route on an attribute if I want the payload or not and then I apply this Jolt spec:

107381-1553183657678.png

[{
    "operation": "shift",
    "spec": {
      "headers": "headers",
      "info": "info",
      "payLoad": "payLoad"
    }
  }, {
    "operation": "default",
    "spec": {
      "_kafka": {
        "offset": "${kafka.offset}",
        "partition": "${kafka.partition}",
        "topic": "${kafka.topic}",
        "key": "${kafka.key}"
      },
      "_nifi": {
        "flowfileuuid": "${uuid}"
      }
    }
}


]







Thanks to confirm this is the right way: I'm a newer to NiFi.