Support Questions

Find answers, ask questions, and share your expertise

Apache Nifi to do aggregation for the given transformed json based on attribute name in the json

avatar
New Contributor


Hi Team ,

I need help in converting the given csv to json format .

This is the input csv.

EMP,EVSEID,SessionID,Starttime,Endtime,Totalchargeamount,Totalduration,Totalcosts
3023513,AT*HTB*E1002760,b8ed2c41-4a0a-4528-9f14-4c4f040de81e,15:06:56 26.10.2019,15:42:59 26.10.2019,5890,2163,2.592
2899202,AT*HTB*E1001927,ee3a1d64-2a69-4d43-87a4-4e23cb992660,16:24:11 08.10.2019,19:00:24 08.10.2019,6430,9373,3.78
2974330,DE*BOX*EAC0016,25543dea-6d58-4c27-a924-dacc01a83162,12:33:09 19.10.2019,15:29:58 19.10.2019,13700,10609,6.028.


I am able to convert the given csv to json output .

Need to add additional
header and footer tag, Footer tag will be holding the aggregated value of the totalcosts given in the csv file.

I am stuck in the process of performing aggregation based on the totalcosts.

 

Attached the generated json and expected json format

 

Below table is having generated json.

[ {
  "record" : {
    "EMP" : "3023513",
    "EVSEID" : "AT*HTB*E1002760",
    "SessionID" : "b8ed2c41-4a0a-4528-9f14-4c4f040de81e",
    "Starttime" : "15:06:56 26.10.2019",
    "Endtime" : "15:42:59 26.10.2019",
    "Totalchargeamount" : "5890",
    "Totalduration" : "2163",
    "Totalcosts" : 2.592
  }
}, {
  "record" : {
    "EMP" : "2899202",
    "EVSEID" : "AT*HTB*E1001927",
    "SessionID" : "ee3a1d64-2a69-4d43-87a4-4e23cb992660",
    "Starttime" : "16:24:11 08.10.2019",
    "Endtime" : "19:00:24 08.10.2019",
    "Totalchargeamount" : "6430",
    "Totalduration" : "9373",
    "Totalcosts" : 3.78
  }
}, {
  "record" : {
    "EMP" : "2974330",
    "EVSEID" : "DE*BOX*EAC0016",
    "SessionID" : "25543dea-6d58-4c27-a924-dacc01a83162",
    "Starttime" : "12:33:09 19.10.2019",
    "Endtime" : "15:29:58 19.10.2019",
    "Totalchargeamount" : "13700",
    "Totalduration" : "10609",
    "Totalcosts" : 6.028
  }
}, {
  "record" : {
    "EMP" : "2871249",
    "EVSEID" : "DE*EWE*E0237*1",
    "SessionID" : "ccc5050c-c708-4f5e-b240-70ecb248a491",
    "Starttime" : "11:24:41 04.10.2019",
    "Endtime" : "12:44:07 04.10.2019",
    "Totalchargeamount" : "6117",
    "Totalduration" : "4766",
    "Totalcosts" : 6
  }
}, {
  "record" : {
    "EMP" : "2928555",
    "EVSEID" : "AT*HTB*E1000955",
    "SessionID" : "a3941853-55f8-4c01-93ef-cd3b03f6c466",
    "Starttime" : "11:20:02 13.10.2019",
    "Endtime" : "11:39:12 13.10.2019",
    "Totalchargeamount" : "14000",
    "Totalduration" : "1150",
    "Totalcosts" : 7.7
  }
}

 

Expected output is as below

Extra tag needs to be added along with the above json

numberOfRecords - sum of all therecords
totalCosts : sum of all the costs in individual records
{

"dataRecordTrailer": {
    "numberOfRecords": "6",
    "totalCosts": 27.1
  }
}

Flow file template details

<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<template encoding-version="1.3">
    <description></description>
    <groupId>276b7fbd-0172-1000-60f9-afcd61d29acf</groupId>
    <name>Sprint7PocTemplate</name>
    <snippet>
        <controllerServices>
            <id>5f712eb5-b76b-3e9f-0000-000000000000</id>
            <parentGroupId>247dfa5d-d32a-3c9d-0000-000000000000</parentGroupId>
            <bundle>
                <artifact>nifi-record-serialization-services-nar</artifact>
                <group>org.apache.nifi</group>
                <version>1.11.4</version>
            </bundle>
            <comments></comments>
            <descriptors>
                <entry>
                    <key>ignore-csv-header</key>
                    <value>
                        <name>ignore-csv-header</name>
                    </value>
                </entry>
                <entry>
                    <key>schema-branch</key>
                    <value>
                        <name>schema-branch</name>
                    </value>
                </entry>
                <entry>
                    <key>CSV Format</key>
                    <value>
                        <name>CSV Format</name>
                    </value>
                </entry>
                <entry>
                    <key>Quote Character</key>
                    <value>
                        <name>Quote Character</name>
                    </value>
                </entry>
                <entry>
                    <key>Value Separator</key>
                    <value>
                        <name>Value Separator</name>
                    </value>
                </entry>
                <entry>
                    <key>Timestamp Format</key>
                    <value>
                        <name>Timestamp Format</name>
                    </value>
                </entry>
                <entry>
                    <key>Escape Character</key>
                    <value>
                        <name>Escape Character</name>
                    </value>
                </entry>
                <entry>
                    <key>Date Format</key>
                    <value>
                        <name>Date Format</name>
                    </value>
                </entry>
                <entry>
                    <key>Null String</key>
                    <value>
                        <name>Null String</name>
                    </value>
                </entry>
                <entry>
                    <key>Skip Header Line</key>
                    <value>
                        <name>Skip Header Line</name>
                    </value>
                </entry>
                <entry>
                    <key>Trim Fields</key>
                    <value>
                        <name>Trim Fields</name>
                    </value>
                </entry>
                <entry>
                    <key>schema-name</key>
                    <value>
                        <name>schema-name</name>
                    </value>
                </entry>
                <entry>
                    <key>schema-registry</key>
                    <value>
                        <identifiesControllerService>org.apache.nifi.schemaregistry.services.SchemaRegistry</identifiesControllerService>
                        <name>schema-registry</name>
                    </value>
                </entry>
                <entry>
                    <key>csv-reader-csv-parser</key>
                    <value>
                        <name>csv-reader-csv-parser</name>
                    </value>
                </entry>
                <entry>
                    <key>Time Format</key>
                    <value>
                        <name>Time Format</name>
                    </value>
                </entry>
                <entry>
                    <key>Comment Marker</key>
                    <value>
                        <name>Comment Marker</name>
                    </value>
                </entry>
                <entry>
                    <key>schema-access-strategy</key>
                    <value>
                        <name>schema-access-strategy</name>
                    </value>
                </entry>
                <entry>
                    <key>schema-version</key>
                    <value>
                        <name>schema-version</name>
                    </value>
                </entry>
                <entry>
                    <key>csvutils-character-set</key>
                    <value>
                        <name>csvutils-character-set</name>
                    </value>
                </entry>
                <entry>
                    <key>schema-text</key>
                    <value>
                        <name>schema-text</name>
                    </value>
                </entry>
            </descriptors>
            <name>DemoCSVReader</name>
            <persistsState>false</persistsState>
            <properties>
                <entry>
                    <key>ignore-csv-header</key>
                    <value>false</value>
                </entry>
                <entry>
                    <key>schema-branch</key>
                </entry>
                <entry>
                    <key>CSV Format</key>
                    <value>custom</value>
                </entry>
                <entry>
                    <key>Quote Character</key>
                    <value>"</value>
                </entry>
                <entry>
                    <key>Value Separator</key>
                    <value>,</value>
                </entry>
                <entry>
                    <key>Timestamp Format</key>
                </entry>
                <entry>
                    <key>Escape Character</key>
                    <value>\</value>
                </entry>
                <entry>
                    <key>Date Format</key>
                </entry>
                <entry>
                    <key>Null String</key>
                </entry>
                <entry>
                    <key>Skip Header Line</key>
                    <value>true</value>
                </entry>
                <entry>
                    <key>Trim Fields</key>
                    <value>true</value>
                </entry>
                <entry>
                    <key>schema-name</key>
                    <value>${rejectionreport}</value>
                </entry>
                <entry>
                    <key>schema-registry</key>
                    <value>d3aa8327-7bdb-3fca-0000-000000000000</value>
                </entry>
                <entry>
                    <key>csv-reader-csv-parser</key>
                    <value>commons-csv</value>
                </entry>
                <entry>
                    <key>Time Format</key>
                </entry>
                <entry>
                    <key>Comment Marker</key>
                </entry>
                <entry>
                    <key>schema-access-strategy</key>
                    <value>csv-header-derived</value>
                </entry>
                <entry>
                    <key>schema-version</key>
                </entry>
                <entry>
                    <key>csvutils-character-set</key>
                    <value>UTF-8</value>
                </entry>
                <entry>
                    <key>schema-text</key>
                </entry>
            </properties>
            <state>ENABLED</state>
            <type>org.apache.nifi.csv.CSVReader</type>
        </controllerServices>
        <controllerServices>
            <id>d3aa8327-7bdb-3fca-0000-000000000000</id>
            <parentGroupId>247dfa5d-d32a-3c9d-0000-000000000000</parentGroupId>
            <bundle>
                <artifact>nifi-registry-nar</artifact>
                <group>org.apache.nifi</group>
                <version>1.11.4</version>
            </bundle>
            <comments></comments>
            <descriptors>
                <entry>
                    <key>avro-reg-validated-field-names</key>
                    <value>
                        <name>avro-reg-validated-field-names</name>
                    </value>
                </entry>
                <entry>
                    <key>rejectionreport</key>
                    <value>
                        <name>rejectionreport</name>
                    </value>
                </entry>
            </descriptors>
            <name>DemoAvroSchemaRegistry</name>
            <persistsState>false</persistsState>
            <properties>
                <entry>
                    <key>avro-reg-validated-field-names</key>
                    <value>true</value>
                </entry>
                <entry>
                    <key>rejectionreport</key>
                    <value>{
 "name": "rejectionreport",
 "namespace": "org.apache.avro",
 "type": "record",
 "fields": [
 {"name": "EMP" , "type" : "string"},
 {"name": "EVSEID" , "type" : "string"},
 {"name": "SessionID" , "type" : "string"},
 {"name": "Starttime" , "type" : "string"},
 {"name": "Endtime" , "type" : "string"},
 {"name": "Totalchargeamount" , "type" : "string"},
 {"name": "Totalduration" , "type" : "string"},
 {"name": "Totalcosts" , "type" : "string"}
 ]
}</value>
                </entry>
            </properties>
            <state>ENABLED</state>
            <type>org.apache.nifi.schemaregistry.services.AvroSchemaRegistry</type>
        </controllerServices>
        <controllerServices>
            <id>f3c04453-2b4a-37bc-0000-000000000000</id>
            <parentGroupId>247dfa5d-d32a-3c9d-0000-000000000000</parentGroupId>
            <bundle>
                <artifact>nifi-record-serialization-services-nar</artifact>
                <group>org.apache.nifi</group>
                <version>1.11.4</version>
            </bundle>
            <comments></comments>
            <descriptors>
                <entry>
                    <key>schema-branch</key>
                    <value>
                        <name>schema-branch</name>
                    </value>
                </entry>
                <entry>
                    <key>compression-level</key>
                    <value>
                        <name>compression-level</name>
                    </value>
                </entry>
                <entry>
                    <key>schema-cache</key>
                    <value>
                        <identifiesControllerService>org.apache.nifi.serialization.RecordSchemaCacheService</identifiesControllerService>
                        <name>schema-cache</name>
                    </value>
                </entry>
                <entry>
                    <key>Timestamp Format</key>
                    <value>
                        <name>Timestamp Format</name>
                    </value>
                </entry>
                <entry>
                    <key>Date Format</key>
                    <value>
                        <name>Date Format</name>
                    </value>
                </entry>
                <entry>
                    <key>Pretty Print JSON</key>
                    <value>
                        <name>Pretty Print JSON</name>
                    </value>
                </entry>
                <entry>
                    <key>compression-format</key>
                    <value>
                        <name>compression-format</name>
                    </value>
                </entry>
                <entry>
                    <key>Schema Write Strategy</key>
                    <value>
                        <name>Schema Write Strategy</name>
                    </value>
                </entry>
                <entry>
                    <key>suppress-nulls</key>
                    <value>
                        <name>suppress-nulls</name>
                    </value>
                </entry>
                <entry>
                    <key>output-grouping</key>
                    <value>
                        <name>output-grouping</name>
                    </value>
                </entry>
                <entry>
                    <key>schema-name</key>
                    <value>
                        <name>schema-name</name>
                    </value>
                </entry>
                <entry>
                    <key>schema-registry</key>
                    <value>
                        <identifiesControllerService>org.apache.nifi.schemaregistry.services.SchemaRegistry</identifiesControllerService>
                        <name>schema-registry</name>
                    </value>
                </entry>
                <entry>
                    <key>Time Format</key>
                    <value>
                        <name>Time Format</name>
                    </value>
                </entry>
                <entry>
                    <key>schema-access-strategy</key>
                    <value>
                        <name>schema-access-strategy</name>
                    </value>
                </entry>
                <entry>
                    <key>schema-version</key>
                    <value>
                        <name>schema-version</name>
                    </value>
                </entry>
                <entry>
                    <key>schema-text</key>
                    <value>
                        <name>schema-text</name>
                    </value>
                </entry>
            </descriptors>
            <name>DemoJsonRecordSetWriter</name>
            <persistsState>false</persistsState>
            <properties>
                <entry>
                    <key>schema-branch</key>
                </entry>
                <entry>
                    <key>compression-level</key>
                    <value>1</value>
                </entry>
                <entry>
                    <key>schema-cache</key>
                </entry>
                <entry>
                    <key>Timestamp Format</key>
                </entry>
                <entry>
                    <key>Date Format</key>
                </entry>
                <entry>
                    <key>Pretty Print JSON</key>
                    <value>true</value>
                </entry>
                <entry>
                    <key>compression-format</key>
                    <value>none</value>
                </entry>
                <entry>
                    <key>Schema Write Strategy</key>
                    <value>schema-name</value>
                </entry>
                <entry>
                    <key>suppress-nulls</key>
                    <value>never-suppress</value>
                </entry>
                <entry>
                    <key>output-grouping</key>
                    <value>output-array</value>
                </entry>
                <entry>
                    <key>schema-name</key>
                    <value>rejectionreport</value>
                </entry>
                <entry>
                    <key>schema-registry</key>
                    <value>d3aa8327-7bdb-3fca-0000-000000000000</value>
                </entry>
                <entry>
                    <key>Time Format</key>
                </entry>
                <entry>
                    <key>schema-access-strategy</key>
                    <value>schema-name</value>
                </entry>
                <entry>
                    <key>schema-version</key>
                </entry>
                <entry>
                    <key>schema-text</key>
                </entry>
            </properties>
            <state>ENABLED</state>
            <type>org.apache.nifi.json.JsonRecordSetWriter</type>
        </controllerServices>
        <processGroups>
            <id>5db4d3b6-1a02-390f-0000-000000000000</id>
            <parentGroupId>247dfa5d-d32a-3c9d-0000-000000000000</parentGroupId>
            <position>
                <x>0.0</x>
                <y>0.0</y>
            </position>
            <comments></comments>
            <contents>
                <connections>
                    <id>56a67400-a9c8-3de8-0000-000000000000</id>
                    <parentGroupId>5db4d3b6-1a02-390f-0000-000000000000</parentGroupId>
                    <backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
                    <backPressureObjectThreshold>10000</backPressureObjectThreshold>
                    <destination>
                        <groupId>5db4d3b6-1a02-390f-0000-000000000000</groupId>
                        <id>d80993a9-f700-3ce7-0000-000000000000</id>
                        <type>PROCESSOR</type>
                    </destination>
                    <flowFileExpiration>0 sec</flowFileExpiration>
                    <labelIndex>1</labelIndex>
                    <loadBalanceCompression>DO_NOT_COMPRESS</loadBalanceCompression>
                    <loadBalancePartitionAttribute></loadBalancePartitionAttribute>
                    <loadBalanceStatus>LOAD_BALANCE_NOT_CONFIGURED</loadBalanceStatus>
                    <loadBalanceStrategy>DO_NOT_LOAD_BALANCE</loadBalanceStrategy>
                    <name></name>
                    <selectedRelationships>success</selectedRelationships>
                    <source>
                        <groupId>5db4d3b6-1a02-390f-0000-000000000000</groupId>
                        <id>2e4486df-8884-3693-0000-000000000000</id>
                        <type>PROCESSOR</type>
                    </source>
                    <zIndex>0</zIndex>
                </connections>
                <connections>
                    <id>749166ab-5510-31a0-0000-000000000000</id>
                    <parentGroupId>5db4d3b6-1a02-390f-0000-000000000000</parentGroupId>
                    <backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
                    <backPressureObjectThreshold>10000</backPressureObjectThreshold>
                    <destination>
                        <groupId>5db4d3b6-1a02-390f-0000-000000000000</groupId>
                        <id>4ed7dc7e-f06c-3a69-0000-000000000000</id>
                        <type>PROCESSOR</type>
                    </destination>
                    <flowFileExpiration>0 sec</flowFileExpiration>
                    <labelIndex>1</labelIndex>
                    <loadBalanceCompression>DO_NOT_COMPRESS</loadBalanceCompression>
                    <loadBalancePartitionAttribute></loadBalancePartitionAttribute>
                    <loadBalanceStatus>LOAD_BALANCE_NOT_CONFIGURED</loadBalanceStatus>
                    <loadBalanceStrategy>DO_NOT_LOAD_BALANCE</loadBalanceStrategy>
                    <name></name>
                    <selectedRelationships>success</selectedRelationships>
                    <source>
                        <groupId>5db4d3b6-1a02-390f-0000-000000000000</groupId>
                        <id>d80993a9-f700-3ce7-0000-000000000000</id>
                        <type>PROCESSOR</type>
                    </source>
                    <zIndex>0</zIndex>
                </connections>
                <connections>
                    <id>d7db5fd5-453f-3841-0000-000000000000</id>
                    <parentGroupId>5db4d3b6-1a02-390f-0000-000000000000</parentGroupId>
                    <backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
                    <backPressureObjectThreshold>10000</backPressureObjectThreshold>
                    <destination>
                        <groupId>5db4d3b6-1a02-390f-0000-000000000000</groupId>
                        <id>15c3c2c4-7a5f-332c-0000-000000000000</id>
                        <type>PROCESSOR</type>
                    </destination>
                    <flowFileExpiration>0 sec</flowFileExpiration>
                    <labelIndex>1</labelIndex>
                    <loadBalanceCompression>DO_NOT_COMPRESS</loadBalanceCompression>
                    <loadBalancePartitionAttribute></loadBalancePartitionAttribute>
                    <loadBalanceStatus>LOAD_BALANCE_NOT_CONFIGURED</loadBalanceStatus>
                    <loadBalanceStrategy>DO_NOT_LOAD_BALANCE</loadBalanceStrategy>
                    <name></name>
                    <selectedRelationships>success</selectedRelationships>
                    <source>
                        <groupId>5db4d3b6-1a02-390f-0000-000000000000</groupId>
                        <id>4ed7dc7e-f06c-3a69-0000-000000000000</id>
                        <type>PROCESSOR</type>
                    </source>
                    <zIndex>0</zIndex>
                </connections>
                <processors>
                    <id>15c3c2c4-7a5f-332c-0000-000000000000</id>
                    <parentGroupId>5db4d3b6-1a02-390f-0000-000000000000</parentGroupId>
                    <position>
                        <x>-10784.0</x>
                        <y>-5800.0</y>
                    </position>
                    <bundle>
                        <artifact>nifi-standard-nar</artifact>
                        <group>org.apache.nifi</group>
                        <version>1.11.4</version>
                    </bundle>
                    <config>
                        <bulletinLevel>WARN</bulletinLevel>
                        <comments></comments>
                        <concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
                        <descriptors>
                            <entry>
<key>Group</key>
<value>
    <name>Group</name>
</value>
                            </entry>
                            <entry>
<key>Owner</key>
<value>
    <name>Owner</name>
</value>
                            </entry>
                            <entry>
<key>Create Missing Directories</key>
<value>
    <name>Create Missing Directories</name>
</value>
                            </entry>
                            <entry>
<key>Permissions</key>
<value>
    <name>Permissions</name>
</value>
                            </entry>
                            <entry>
<key>Maximum File Count</key>
<value>
    <name>Maximum File Count</name>
</value>
                            </entry>
                            <entry>
<key>Last Modified Time</key>
<value>
    <name>Last Modified Time</name>
</value>
                            </entry>
                            <entry>
<key>Directory</key>
<value>
    <name>Directory</name>
</value>
                            </entry>
                            <entry>
<key>Conflict Resolution Strategy</key>
<value>
    <name>Conflict Resolution Strategy</name>
</value>
                            </entry>
                        </descriptors>
                        <executionNode>ALL</executionNode>
                        <lossTolerant>false</lossTolerant>
                        <penaltyDuration>30 sec</penaltyDuration>
                        <properties>
                            <entry>
<key>Group</key>
                            </entry>
                            <entry>
<key>Owner</key>
                            </entry>
                            <entry>
<key>Create Missing Directories</key>
<value>true</value>
                            </entry>
                            <entry>
<key>Permissions</key>
                            </entry>
                            <entry>
<key>Maximum File Count</key>
                            </entry>
                            <entry>
<key>Last Modified Time</key>
                            </entry>
                            <entry>
<key>Directory</key>
<value>C://seetha/syniverse/mediationsystem/output/</value>
                            </entry>
                            <entry>
<key>Conflict Resolution Strategy</key>
<value>fail</value>
                            </entry>
                        </properties>
                        <runDurationMillis>0</runDurationMillis>
                        <schedulingPeriod>0 sec</schedulingPeriod>
                        <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
                        <yieldDuration>1 sec</yieldDuration>
                    </config>
                    <executionNodeRestricted>false</executionNodeRestricted>
                    <name>PutFile</name>
                    <relationships>
                        <autoTerminate>true</autoTerminate>
                        <name>failure</name>
                    </relationships>
                    <relationships>
                        <autoTerminate>true</autoTerminate>
                        <name>success</name>
                    </relationships>
                    <state>STOPPED</state>
                    <style/>
                    <type>org.apache.nifi.processors.standard.PutFile</type>
                </processors>
                <processors>
                    <id>2e4486df-8884-3693-0000-000000000000</id>
                    <parentGroupId>5db4d3b6-1a02-390f-0000-000000000000</parentGroupId>
                    <position>
                        <x>-10768.0</x>
                        <y>-6464.0</y>
                    </position>
                    <bundle>
                        <artifact>nifi-standard-nar</artifact>
                        <group>org.apache.nifi</group>
                        <version>1.11.4</version>
                    </bundle>
                    <config>
                        <bulletinLevel>WARN</bulletinLevel>
                        <comments></comments>
                        <concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
                        <descriptors>
                            <entry>
<key>Keep Source File</key>
<value>
    <name>Keep Source File</name>
</value>
                            </entry>
                            <entry>
<key>Minimum File Age</key>
<value>
    <name>Minimum File Age</name>
</value>
                            </entry>
                            <entry>
<key>Polling Interval</key>
<value>
    <name>Polling Interval</name>
</value>
                            </entry>
                            <entry>
<key>Input Directory</key>
<value>
    <name>Input Directory</name>
</value>
                            </entry>
                            <entry>
<key>Maximum File Age</key>
<value>
    <name>Maximum File Age</name>
</value>
                            </entry>
                            <entry>
<key>Batch Size</key>
<value>
    <name>Batch Size</name>
</value>
                            </entry>
                            <entry>
<key>Maximum File Size</key>
<value>
    <name>Maximum File Size</name>
</value>
                            </entry>
                            <entry>
<key>Minimum File Size</key>
<value>
    <name>Minimum File Size</name>
</value>
                            </entry>
                            <entry>
<key>Ignore Hidden Files</key>
<value>
    <name>Ignore Hidden Files</name>
</value>
                            </entry>
                            <entry>
<key>Recurse Subdirectories</key>
<value>
    <name>Recurse Subdirectories</name>
</value>
                            </entry>
                            <entry>
<key>File Filter</key>
<value>
    <name>File Filter</name>
</value>
                            </entry>
                            <entry>
<key>Path Filter</key>
<value>
    <name>Path Filter</name>
</value>
                            </entry>
                        </descriptors>
                        <executionNode>ALL</executionNode>
                        <lossTolerant>false</lossTolerant>
                        <penaltyDuration>30 sec</penaltyDuration>
                        <properties>
                            <entry>
<key>Keep Source File</key>
<value>false</value>
                            </entry>
                            <entry>
<key>Minimum File Age</key>
<value>0 sec</value>
                            </entry>
                            <entry>
<key>Polling Interval</key>
<value>0 sec</value>
                            </entry>
                            <entry>
<key>Input Directory</key>
<value>C://seetha/syniverse/mediationsystem/input/</value>
                            </entry>
                            <entry>
<key>Maximum File Age</key>
                            </entry>
                            <entry>
<key>Batch Size</key>
<value>10</value>
                            </entry>
                            <entry>
<key>Maximum File Size</key>
                            </entry>
                            <entry>
<key>Minimum File Size</key>
<value>0 B</value>
                            </entry>
                            <entry>
<key>Ignore Hidden Files</key>
<value>true</value>
                            </entry>
                            <entry>
<key>Recurse Subdirectories</key>
<value>true</value>
                            </entry>
                            <entry>
<key>File Filter</key>
<value>[^\.].*</value>
                            </entry>
                            <entry>
<key>Path Filter</key>
                            </entry>
                        </properties>
                        <runDurationMillis>0</runDurationMillis>
                        <schedulingPeriod>0 sec</schedulingPeriod>
                        <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
                        <yieldDuration>1 sec</yieldDuration>
                    </config>
                    <executionNodeRestricted>false</executionNodeRestricted>
                    <name>GetFile</name>
                    <relationships>
                        <autoTerminate>false</autoTerminate>
                        <name>success</name>
                    </relationships>
                    <state>STOPPED</state>
                    <style/>
                    <type>org.apache.nifi.processors.standard.GetFile</type>
                </processors>
                <processors>
                    <id>4ed7dc7e-f06c-3a69-0000-000000000000</id>
                    <parentGroupId>5db4d3b6-1a02-390f-0000-000000000000</parentGroupId>
                    <position>
                        <x>-10776.0</x>
                        <y>-6024.0</y>
                    </position>
                    <bundle>
                        <artifact>nifi-update-attribute-nar</artifact>
                        <group>org.apache.nifi</group>
                        <version>1.11.4</version>
                    </bundle>
                    <config>
                        <bulletinLevel>WARN</bulletinLevel>
                        <comments></comments>
                        <concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
                        <descriptors>
                            <entry>
<key>Delete Attributes Expression</key>
<value>
    <name>Delete Attributes Expression</name>
</value>
                            </entry>
                            <entry>
<key>filename</key>
<value>
    <name>filename</name>
</value>
                            </entry>
                            <entry>
<key>Store State</key>
<value>
    <name>Store State</name>
</value>
                            </entry>
                            <entry>
<key>canonical-value-lookup-cache-size</key>
<value>
    <name>canonical-value-lookup-cache-size</name>
</value>
                            </entry>
                            <entry>
<key>Stateful Variables Initial Value</key>
<value>
    <name>Stateful Variables Initial Value</name>
</value>
                            </entry>
                        </descriptors>
                        <executionNode>ALL</executionNode>
                        <lossTolerant>false</lossTolerant>
                        <penaltyDuration>30 sec</penaltyDuration>
                        <properties>
                            <entry>
<key>Delete Attributes Expression</key>
                            </entry>
                            <entry>
<key>filename</key>
<value>${filename}.json</value>
                            </entry>
                            <entry>
<key>Store State</key>
<value>Do not store state</value>
                            </entry>
                            <entry>
<key>canonical-value-lookup-cache-size</key>
<value>100</value>
                            </entry>
                            <entry>
<key>Stateful Variables Initial Value</key>
                            </entry>
                        </properties>
                        <runDurationMillis>0</runDurationMillis>
                        <schedulingPeriod>0 sec</schedulingPeriod>
                        <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
                        <yieldDuration>1 sec</yieldDuration>
                    </config>
                    <executionNodeRestricted>false</executionNodeRestricted>
                    <name>UpdateAttribute</name>
                    <relationships>
                        <autoTerminate>false</autoTerminate>
                        <name>success</name>
                    </relationships>
                    <state>STOPPED</state>
                    <style/>
                    <type>org.apache.nifi.processors.attributes.UpdateAttribute</type>
                </processors>
                <processors>
                    <id>d80993a9-f700-3ce7-0000-000000000000</id>
                    <parentGroupId>5db4d3b6-1a02-390f-0000-000000000000</parentGroupId>
                    <position>
                        <x>-10768.0</x>
                        <y>-6240.0</y>
                    </position>
                    <bundle>
                        <artifact>nifi-standard-nar</artifact>
                        <group>org.apache.nifi</group>
                        <version>1.11.4</version>
                    </bundle>
                    <config>
                        <bulletinLevel>WARN</bulletinLevel>
                        <comments></comments>
                        <concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
                        <descriptors>
                            <entry>
<key>record-writer</key>
<value>
    <identifiesControllerService>org.apache.nifi.serialization.RecordSetWriterFactory</identifiesControllerService>
    <name>record-writer</name>
</value>
                            </entry>
                            <entry>
<key>record-reader</key>
<value>
    <identifiesControllerService>org.apache.nifi.serialization.RecordReaderFactory</identifiesControllerService>
    <name>record-reader</name>
</value>
                            </entry>
                            <entry>
<key>include-zero-record-flowfiles</key>
<value>
    <name>include-zero-record-flowfiles</name>
</value>
                            </entry>
                        </descriptors>
                        <executionNode>ALL</executionNode>
                        <lossTolerant>false</lossTolerant>
                        <penaltyDuration>30 sec</penaltyDuration>
                        <properties>
                            <entry>
<key>record-writer</key>
<value>f3c04453-2b4a-37bc-0000-000000000000</value>
                            </entry>
                            <entry>
<key>record-reader</key>
<value>5f712eb5-b76b-3e9f-0000-000000000000</value>
                            </entry>
                            <entry>
<key>include-zero-record-flowfiles</key>
<value>true</value>
                            </entry>
                        </properties>
                        <runDurationMillis>0</runDurationMillis>
                        <schedulingPeriod>0 sec</schedulingPeriod>
                        <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
                        <yieldDuration>1 sec</yieldDuration>
                    </config>
                    <executionNodeRestricted>false</executionNodeRestricted>
                    <name>ConvertRecord</name>
                    <relationships>
                        <autoTerminate>true</autoTerminate>
                        <name>failure</name>
                    </relationships>
                    <relationships>
                        <autoTerminate>false</autoTerminate>
                        <name>success</name>
                    </relationships>
                    <state>STOPPED</state>
                    <style/>
                    <type>org.apache.nifi.processors.standard.ConvertRecord</type>
                </processors>
            </contents>
            <name>sprint7PocDemo</name>
            <variables/>
        </processGroups>
    </snippet>
    <timestamp>05/21/2020 17:21:25 IST</timestamp>
</template>

 

Please guide me how to proceed the aggregation for the given json

 

Thanks & Regards,

P.Seetha

 

 

 

3 ACCEPTED SOLUTIONS

avatar

Nifi is not really designed to work with 'context'. If you have a simple record there are many operations which you can do, but if you are working with potentially complex files and thus complex operations, you will likely rather process them with something like spark or python.


- Dennis Jaheruddin

If this answer helped, please mark it as 'solved' and/or if it is valuable for future readers please apply 'kudos'.

View solution in original post

avatar
Super Guru

@Seetha This is a very common use case for NiFi and JSON processing pipelines.

 

Here is a link that explains a solution (ExecuteScript) you could use:

 

https://community.cloudera.com/t5/Support-Questions/Apache-Nifi-How-to-calculate-SUM-or-AVERAGE-of-v...

 

Additional @mburgess  in that posts links a JIRA for a new Processor he was trying to work on at the time.  The end result of that JIRA is his recommendation that QueryRecord processor should give you the ability to calculate the sum.   Using QueryRecord you would read the values and be able to create a fabricated sql query to calculate the sums.  Then you would use a RecordWriter to re-write the orginal json object with the sums, or to create completely different json object with the sums.

 

 

If this answer resolves your issue or allows you to move forward, please choose to ACCEPT this solution and close this topic. If you have further dialogue on this topic please comment here or feel free to private message me. If you have new questions related to your Use Case please create separate topic and feel free to tag me in your post.  

 

Thanks,


Steven @ DFHZ

View solution in original post

avatar
New Contributor

@DennisJaheruddi  Thanks Dennis.I tried to use QueryRecord ,it helped to get the count.

-Seetha

View solution in original post

4 REPLIES 4

avatar

Nifi is not really designed to work with 'context'. If you have a simple record there are many operations which you can do, but if you are working with potentially complex files and thus complex operations, you will likely rather process them with something like spark or python.


- Dennis Jaheruddin

If this answer helped, please mark it as 'solved' and/or if it is valuable for future readers please apply 'kudos'.

avatar
New Contributor

@DennisJaheruddi  Thanks Dennis.I tried to use QueryRecord ,it helped to get the count.

-Seetha

avatar
Super Guru

@Seetha This is a very common use case for NiFi and JSON processing pipelines.

 

Here is a link that explains a solution (ExecuteScript) you could use:

 

https://community.cloudera.com/t5/Support-Questions/Apache-Nifi-How-to-calculate-SUM-or-AVERAGE-of-v...

 

Additional @mburgess  in that posts links a JIRA for a new Processor he was trying to work on at the time.  The end result of that JIRA is his recommendation that QueryRecord processor should give you the ability to calculate the sum.   Using QueryRecord you would read the values and be able to create a fabricated sql query to calculate the sums.  Then you would use a RecordWriter to re-write the orginal json object with the sums, or to create completely different json object with the sums.

 

 

If this answer resolves your issue or allows you to move forward, please choose to ACCEPT this solution and close this topic. If you have further dialogue on this topic please comment here or feel free to private message me. If you have new questions related to your Use Case please create separate topic and feel free to tag me in your post.  

 

Thanks,


Steven @ DFHZ

avatar
New Contributor

@stevenmatison  Thanks .I used QueryRecord ,it helped to get count .