<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>question Re: Split JSON flow file and recompose to new flow content in Support Questions</title>
    <link>https://community.cloudera.com/t5/Support-Questions/Split-JSON-flow-file-and-recompose-to-new-flow-content/m-p/235598#M197411</link>
    <description>&lt;P&gt;Instead of extracting the content into attributes (which will effectively turn the JSON objects into Strings), you should be able to add the core attributes using UpdateRecord or JoltTransformJSON/JoltTransformRecord. For the record processors, it looks like you could use the following schema for the reader:&lt;/P&gt;&lt;PRE&gt;{
&amp;nbsp;"namespace": "nifi",
&amp;nbsp;"name": "myRecord",
&amp;nbsp;"type": "record",
&amp;nbsp;"fields": [
&amp;nbsp; {"name": "headers","type": {"type": "map","values": "string"}},
&amp;nbsp; {"name": "info","type": {"type": "map","values": "string"}},
&amp;nbsp; {"name": "payload","type": {"type": "map","values": "string"}}
&amp;nbsp;]
}&lt;/PRE&gt;&lt;P&gt;For the subflow to add core attributes, you'd want the writer schema to include the additional fields such as kafka.partition, then in UpdateRecord for example, you could add a user-defined property /kafka.partition with value 1 (and Replacement Strategy "Literal Value"). If in the core attribute subflow you want to remove the payload field, you can just remove it from the writer schema, and it won't be included in the output. For example:&lt;/P&gt;&lt;PRE&gt;{ 
"namespace": "nifi", 
  "name": "myRecord", 
  "type": "record", 
  "fields": [
  &amp;nbsp;{"name": "headers","type": {"type": "map","values": "string"}}, &amp;nbsp;
   {"name": "info","type": {"type": "map","values": "string"}}, &amp;nbsp;
   {"name": "uuid","type": "string"},
   {"name": "kafka.topic","type": "string"},
   {"name": "kafka.partition","type": "string"},
   {"name": "kafka.offset","type": "long"}
 ] 
}&lt;/PRE&gt;&lt;P&gt;&lt;BR /&gt;&lt;/P&gt;&lt;P&gt;&lt;BR /&gt;&lt;/P&gt;&lt;P&gt;If I understand correctly, your headers+info+payload content would remain the same as the original. If instead you are trying to collapse all the key-value pairs from headers, info, etc. and possibly add core attributes, then JoltTransformJSON is probably the best choice. If that's what you meant please let me know and I'll help with the Jolt spec to do the transformation.&lt;/P&gt;</description>
    <pubDate>Thu, 21 Mar 2019 19:55:23 GMT</pubDate>
    <dc:creator>mburgess</dc:creator>
    <dc:date>2019-03-21T19:55:23Z</dc:date>
    <item>
      <title>Split JSON flow file and recompose to new flow content</title>
      <link>https://community.cloudera.com/t5/Support-Questions/Split-JSON-flow-file-and-recompose-to-new-flow-content/m-p/235597#M197410</link>
      <description>&lt;P&gt;Hi at all,&lt;/P&gt;&lt;P&gt;&lt;BR /&gt;&lt;/P&gt;&lt;P&gt;I have a JSON in my FlowFile content like this&lt;/P&gt;&lt;PRE&gt;{ &amp;nbsp; 
  "headers": {
                "key_1":"value_1",
                 ......               
                "key_n":"value_n"}, &amp;nbsp;
  "info": {&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; "key_1":"value_1",
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;...... &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; 
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; "key_n":"value_n"},&amp;nbsp;
  "payload": {&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; 
                "key_1":"value_1",
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;...... &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; 
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; "key_n":"value_n"}
}&amp;nbsp;&lt;/PRE&gt;&lt;P&gt;&lt;BR /&gt;&lt;/P&gt;&lt;P&gt;&lt;BR /&gt;&lt;/P&gt;&lt;P&gt;my aim in the flow is:&lt;/P&gt;&lt;OL&gt;&lt;LI&gt;split the original JSON into three attributes (headers, payload and info)&lt;/LI&gt;&lt;LI&gt;route on a global variable into two subflows&lt;OL&gt;&lt;LI&gt;merge headers+info+core attributes&lt;/LI&gt;&lt;LI&gt;merge headers+info+payload attributes &lt;/LI&gt;&lt;/OL&gt;&lt;/LI&gt;&lt;LI&gt;add core attributes to new JSON content (like uuid, kafka topic, kafka partition, kafka offset)&lt;/LI&gt;&lt;LI&gt;send the new json to splunk with HttpInvoke POST&lt;/LI&gt;&lt;/OL&gt;&lt;P&gt;I've tried to implement the bullets 1 with EvaluateJSONPath processor and 2.1, 2.2, 3 bullets with AttributesToJSON processor but I've got this problem:&lt;/P&gt;&lt;P&gt;in &lt;STRONG&gt;AttributesToJSON &lt;/STRONG&gt;if I use flow-file content as &lt;STRONG&gt;Destination &lt;/STRONG&gt;the merge works but it writes headers, info and payload values like unique values instead of other key-value json object. Here an example:&lt;/P&gt;&lt;PRE&gt;{
&amp;nbsp; "headers": "{\"key_1\":\"value_1\",
               ......
               \"key_n\":\"value_n\"}]}}}", &amp;nbsp;
"info": "{\"key_1\":\"value_1\",
 &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; ......
 &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;\"key_n\":\"value_n\"}]}}}",&amp;nbsp;  
"payload": "{\"key_1\":\"value_1\",
 &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; ......
 &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;\"key_n\":\"value_n\"}]}}}",
"kafka.partition": "1"
}&lt;/PRE&gt;&lt;P&gt;&lt;BR /&gt;&lt;/P&gt;&lt;P&gt;Any ideas?&lt;/P&gt;&lt;P&gt;&lt;BR /&gt;&lt;/P&gt;&lt;P&gt;Thank you!&lt;/P&gt;</description>
      <pubDate>Thu, 14 Mar 2019 04:48:10 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Support-Questions/Split-JSON-flow-file-and-recompose-to-new-flow-content/m-p/235597#M197410</guid>
      <dc:creator>pietro_fragnito</dc:creator>
      <dc:date>2019-03-14T04:48:10Z</dc:date>
    </item>
    <item>
      <title>Re: Split JSON flow file and recompose to new flow content</title>
      <link>https://community.cloudera.com/t5/Support-Questions/Split-JSON-flow-file-and-recompose-to-new-flow-content/m-p/235598#M197411</link>
      <description>&lt;P&gt;Instead of extracting the content into attributes (which will effectively turn the JSON objects into Strings), you should be able to add the core attributes using UpdateRecord or JoltTransformJSON/JoltTransformRecord. For the record processors, it looks like you could use the following schema for the reader:&lt;/P&gt;&lt;PRE&gt;{
&amp;nbsp;"namespace": "nifi",
&amp;nbsp;"name": "myRecord",
&amp;nbsp;"type": "record",
&amp;nbsp;"fields": [
&amp;nbsp; {"name": "headers","type": {"type": "map","values": "string"}},
&amp;nbsp; {"name": "info","type": {"type": "map","values": "string"}},
&amp;nbsp; {"name": "payload","type": {"type": "map","values": "string"}}
&amp;nbsp;]
}&lt;/PRE&gt;&lt;P&gt;For the subflow to add core attributes, you'd want the writer schema to include the additional fields such as kafka.partition, then in UpdateRecord for example, you could add a user-defined property /kafka.partition with value 1 (and Replacement Strategy "Literal Value"). If in the core attribute subflow you want to remove the payload field, you can just remove it from the writer schema, and it won't be included in the output. For example:&lt;/P&gt;&lt;PRE&gt;{ 
"namespace": "nifi", 
  "name": "myRecord", 
  "type": "record", 
  "fields": [
  &amp;nbsp;{"name": "headers","type": {"type": "map","values": "string"}}, &amp;nbsp;
   {"name": "info","type": {"type": "map","values": "string"}}, &amp;nbsp;
   {"name": "uuid","type": "string"},
   {"name": "kafka.topic","type": "string"},
   {"name": "kafka.partition","type": "string"},
   {"name": "kafka.offset","type": "long"}
 ] 
}&lt;/PRE&gt;&lt;P&gt;&lt;BR /&gt;&lt;/P&gt;&lt;P&gt;&lt;BR /&gt;&lt;/P&gt;&lt;P&gt;If I understand correctly, your headers+info+payload content would remain the same as the original. If instead you are trying to collapse all the key-value pairs from headers, info, etc. and possibly add core attributes, then JoltTransformJSON is probably the best choice. If that's what you meant please let me know and I'll help with the Jolt spec to do the transformation.&lt;/P&gt;</description>
      <pubDate>Thu, 21 Mar 2019 19:55:23 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Support-Questions/Split-JSON-flow-file-and-recompose-to-new-flow-content/m-p/235598#M197411</guid>
      <dc:creator>mburgess</dc:creator>
      <dc:date>2019-03-21T19:55:23Z</dc:date>
    </item>
    <item>
      <title>Re: Split JSON flow file and recompose to new flow content</title>
      <link>https://community.cloudera.com/t5/Support-Questions/Split-JSON-flow-file-and-recompose-to-new-flow-content/m-p/235599#M197412</link>
      <description>&lt;P&gt;Thanks &lt;a href="https://community.cloudera.com/t5/user/viewprofilepage/user-id/103"&gt;@Matt&lt;/a&gt;. Actually Jolt Transformation is the the solution worked for me (reached few days ago, I'm looking for time to write my answer &lt;span class="lia-unicode-emoji" title=":slightly_smiling_face:"&gt;🙂&lt;/span&gt; ).&lt;/P&gt;&lt;P&gt;&lt;BR /&gt;&lt;/P&gt;&lt;P&gt;In my flow I route on an attribute if I want the payload or not and then I apply this Jolt spec:&lt;/P&gt;&lt;P&gt;&lt;span class="lia-inline-image-display-wrapper lia-image-align-inline" image-alt="107381-1553183657678.png" style="width: 874px;"&gt;&lt;img src="https://community.cloudera.com/t5/image/serverpage/image-id/14519i7E5E09DE1DE88A3C/image-size/medium?v=v2&amp;amp;px=400" role="button" title="107381-1553183657678.png" alt="107381-1553183657678.png" /&gt;&lt;/span&gt;&lt;BR /&gt;&lt;/P&gt;&lt;PRE&gt;[{
&amp;nbsp; &amp;nbsp; "operation": "shift",
&amp;nbsp; &amp;nbsp; "spec": {
&amp;nbsp; &amp;nbsp; &amp;nbsp; "headers": "headers",
&amp;nbsp; &amp;nbsp; &amp;nbsp; "info": "info",
&amp;nbsp; &amp;nbsp; &amp;nbsp; "payLoad": "payLoad"
&amp;nbsp; &amp;nbsp; }
&amp;nbsp; }, {
&amp;nbsp; &amp;nbsp; "operation": "default",
&amp;nbsp; &amp;nbsp; "spec": {
&amp;nbsp; &amp;nbsp; &amp;nbsp; "_kafka": {
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; "offset": "${kafka.offset}",
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; "partition": "${kafka.partition}",
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; "topic": "${kafka.topic}",
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; "key": "${kafka.key}"
&amp;nbsp; &amp;nbsp; &amp;nbsp; },
&amp;nbsp; &amp;nbsp; &amp;nbsp; "_nifi": {
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; "flowfileuuid": "${uuid}"
&amp;nbsp; &amp;nbsp; &amp;nbsp; }
&amp;nbsp; &amp;nbsp; }
}&lt;/PRE&gt;&lt;P&gt;&lt;BR /&gt;&lt;/P&gt;&lt;P&gt;]&lt;/P&gt;&lt;P&gt;&lt;BR /&gt;&lt;/P&gt;&lt;P&gt;&lt;BR /&gt;&lt;/P&gt;&lt;P&gt;&lt;BR /&gt;&lt;/P&gt;&lt;P&gt;&lt;BR /&gt;&lt;/P&gt;&lt;P&gt;&lt;BR /&gt;&lt;/P&gt;&lt;P&gt;&lt;BR /&gt;&lt;/P&gt;&lt;P&gt;Thanks to confirm this is the right way: I'm a newer to NiFi.&lt;/P&gt;&lt;P&gt;&lt;BR /&gt;&lt;/P&gt;</description>
      <pubDate>Sat, 17 Aug 2019 23:47:56 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Support-Questions/Split-JSON-flow-file-and-recompose-to-new-flow-content/m-p/235599#M197412</guid>
      <dc:creator>pietro_fragnito</dc:creator>
      <dc:date>2019-08-17T23:47:56Z</dc:date>
    </item>
  </channel>
</rss>

