Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Extract field from flow files to comma separated batches

avatar
Explorer

I have a flow that extracts an ID from a flow file, then does an ExectueSQL to fetch some additional data.   Since this is one at a time, it's very inefficient (~300 records per minute - which doesn't sound bad until it's processing a million records).

 

Is there a way to run flow files through a process and extract the required value in batches of 50 or 100 so that it returns a comma-separated value (like 123, 124, 125, 126, ... etc.) to pass to the ExecuteSQL so it's returning data in larger batches instead of one at a time?

 

Currently, it's doing an EvaluateJsonPath, pulling that one value to a property so the ExecuteSQL processor can construct the SQL like "SELECT * FROM TABLE_NAME WHERE KEY = ${key}".  I'd like to modify that so it's "...WHERE KEY IN (${keyList})"  and keyList is the comma-separated list created above. 

 

Not sure how to approach this, however. 

5 REPLIES 5

avatar
Explorer

I was looking at the scripting capabilities...  is something like the outline below possible (pseudocode, obviously)

 

while (file = get new flow file from intput)  {
    data=...extract field ...
    append data to string
    if (extracted more than 50 things) {
        emit new flowfile with string as attribute
    }
}

 

I've seen plenty of examples of what I'd consider a "transform single flowfile into something else" but not where a script can fetch another flowfile from the session with a "flowfile = session.get();" but it's unclear if you can do that multiple times in the same script.

avatar
Contributor

Perhaps we could first read the flow file content in json format, and then parsing it as json object.
(
e.g. if using groovy script, we might use JsonSlurper to deal with such flow file content of valid JSON format.

)

OliverGong_1-1642189626485.png

 

OliverGong_0-1642189591402.png


Then the rest of the thing could be dealing with the iteration loop for its ID array, and following the same logic to distinguish the different pages.

avatar
Contributor

Here is a customized way to implement such algorithm without writing any Groovy/Java/Python/etc. script.
1. Manually design a iteration loop to prepare the paged/batched sql str.
2. Inside such loop, we can focus on manipulating the pageNumber(loop index) to prepare each sql statement accordingly.

For instance, we can use JOLT specification to group those ID into an IDList array via JoltTransformJSON processor, and is result would be like [123,124,125,...].

After that, we can design an iteration loop, to get predefined range of numbers from aforementioned IDList based on the batchSize/pageSize. 

For the 1st-Loop we can get 1st page filter scope, which covers the ID in
('IDList[0]', ..., 'IDList[pageSize*1-1]')

For the 2nd-Loop we can 2nd page filter scope, which covers the ID in
('IDList[pageSize*(2-1)]', ...., 'IDList[pageSize*2-1]')

As for the middle page filter scope, it would be 
('IDList[pageSize*(pageNum-1)]',....'IDList[pageSize*pageNum-1]')

For the last page, it then comes to be
('IDList[pageSize*(pageNum-1)]',....'IDList[IDListLength-1]')


In such way, we now get batched sql statement's  in-conditional part to prepare the where clause.

You may refer the following link to use its attached  xml template file for more details.

Hope this may help. 

avatar
Contributor

Attached the xml template file for your reference:

<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<template encoding-version="1.2">
    <description>BatchPreSQL
Author: Oliver Gong
DATE: 2022-01-15
</description>
    <groupId>0a50e507-017e-1000-2e5c-c691a8dbb45b</groupId>
    <name>BatchPreSQL</name>
    <snippet>
        <processGroups>
            <id>eff4ec5e-a8ab-3310-0000-000000000000</id>
            <parentGroupId>5511af2f-6c32-35cc-0000-000000000000</parentGroupId>
            <position>
                <x>0.0</x>
                <y>0.0</y>
            </position>
            <additions/>
            <comments>BatchPreSQL
Author: Oliver Gong
DATE: 2022-01-14
</comments>
            <contents>
                <connections>
                    <id>15c43a7e-4e9a-39c0-0000-000000000000</id>
                    <parentGroupId>eff4ec5e-a8ab-3310-0000-000000000000</parentGroupId>
                    <backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
                    <backPressureObjectThreshold>10000</backPressureObjectThreshold>
                    <destination>
                        <groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
                        <id>8901cdca-8fdd-3d01-0000-000000000000</id>
                        <type>INPUT_PORT</type>
                    </destination>
                    <flowFileExpiration>0 sec</flowFileExpiration>
                    <labelIndex>1</labelIndex>
                    <name></name>
                    <selectedRelationships>success</selectedRelationships>
                    <source>
                        <groupId>eff4ec5e-a8ab-3310-0000-000000000000</groupId>
                        <id>30fdeb4d-c403-3862-0000-000000000000</id>
                        <type>PROCESSOR</type>
                    </source>
                    <zIndex>0</zIndex>
                </connections>
                <connections>
                    <id>97ca5515-e845-3e39-0000-000000000000</id>
                    <parentGroupId>eff4ec5e-a8ab-3310-0000-000000000000</parentGroupId>
                    <backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
                    <backPressureObjectThreshold>10000</backPressureObjectThreshold>
                    <bends>
                        <x>643.1160941528465</x>
                        <y>1122.8689543715004</y>
                    </bends>
                    <destination>
                        <groupId>eff4ec5e-a8ab-3310-0000-000000000000</groupId>
                        <id>e333c1a6-2f02-341e-0000-000000000000</id>
                        <type>PROCESSOR</type>
                    </destination>
                    <flowFileExpiration>0 sec</flowFileExpiration>
                    <labelIndex>1</labelIndex>
                    <name></name>
                    <source>
                        <groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
                        <id>6ff09bae-c4a1-38c5-0000-000000000000</id>
                        <type>OUTPUT_PORT</type>
                    </source>
                    <zIndex>0</zIndex>
                </connections>
                <connections>
                    <id>9874061c-5dd6-3fd9-0000-000000000000</id>
                    <parentGroupId>eff4ec5e-a8ab-3310-0000-000000000000</parentGroupId>
                    <backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
                    <backPressureObjectThreshold>10000</backPressureObjectThreshold>
                    <destination>
                        <groupId>eff4ec5e-a8ab-3310-0000-000000000000</groupId>
                        <id>74dbf5d9-50b9-3055-0000-000000000000</id>
                        <type>FUNNEL</type>
                    </destination>
                    <flowFileExpiration>0 sec</flowFileExpiration>
                    <labelIndex>1</labelIndex>
                    <name></name>
                    <source>
                        <groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
                        <id>b8c0272c-e161-3456-0000-000000000000</id>
                        <type>OUTPUT_PORT</type>
                    </source>
                    <zIndex>0</zIndex>
                </connections>
                <connections>
                    <id>e271b74a-2af3-3fcf-0000-000000000000</id>
                    <parentGroupId>eff4ec5e-a8ab-3310-0000-000000000000</parentGroupId>
                    <backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
                    <backPressureObjectThreshold>10000</backPressureObjectThreshold>
                    <destination>
                        <groupId>eff4ec5e-a8ab-3310-0000-000000000000</groupId>
                        <id>e333c1a6-2f02-341e-0000-000000000000</id>
                        <type>PROCESSOR</type>
                    </destination>
                    <flowFileExpiration>0 sec</flowFileExpiration>
                    <labelIndex>1</labelIndex>
                    <name></name>
                    <source>
                        <groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
                        <id>3e3e7f3b-3d3a-3b8d-0000-000000000000</id>
                        <type>OUTPUT_PORT</type>
                    </source>
                    <zIndex>0</zIndex>
                </connections>
                <funnels>
                    <id>74dbf5d9-50b9-3055-0000-000000000000</id>
                    <parentGroupId>eff4ec5e-a8ab-3310-0000-000000000000</parentGroupId>
                    <position>
                        <x>386.6160941528465</x>
                        <y>1410.3689543715004</y>
                    </position>
                </funnels>
                <labels>
                    <id>087856d3-c058-355e-0000-000000000000</id>
                    <parentGroupId>eff4ec5e-a8ab-3310-0000-000000000000</parentGroupId>
                    <position>
                        <x>94.61609415284647</x>
                        <y>782.3689543715004</y>
                    </position>
                    <height>255.0</height>
                    <label>1. Generate  a request body to mock the client’s json request as following:
[{
"ID":123,
"params":"xxx"
},{
"ID":124,
"params":"xxx"
},{
"ID":125,
"params":"xxx"
},{
"ID":126,
"params":"xxx"
},{
"ID":127,
"params":"xxx"
}]</label>
                    <style>
                        <entry>
                            <key>font-size</key>
                            <value>12px</value>
                        </entry>
                    </style>
                    <width>528.0</width>
                </labels>
                <labels>
                    <id>4a9b240d-8ba7-3852-0000-000000000000</id>
                    <parentGroupId>eff4ec5e-a8ab-3310-0000-000000000000</parentGroupId>
                    <position>
                        <x>-30.383905847153528</x>
                        <y>1472.3689543715004</y>
                    </position>
                    <height>439.0</height>
                    <label>Description on each condition scope of the spawn SQLs
==============================================

The index of data from IDList arrays is listed like

1stPage:  [FROM 0 TO pageSize-1]

2ndPage: [FROM pageSize TO 2*pageSize-1]

...

MiddlePage: [FROM (currentPage-1)*pageSize TO pageNumber*pageSize-1]

...

LastPage: [FROM   (currentPage-1)*pageSiz TO IDListLength-1 ]

As per the JSONPath grammar, to slice the array into a sub array, 
the specification we need to know is that :
    when using [I_START:I_END] JSONPath to slice the array, 
    -&gt; the data we gonna get is FROM index I_START TO I_END -1
     which means the index we specified after ":" would not be included to the slicing result. </label>
                    <style>
                        <entry>
                            <key>font-size</key>
                            <value>24px</value>
                        </entry>
                    </style>
                    <width>1040.0</width>
                </labels>
                <labels>
                    <id>9e75a5d0-b823-3548-0000-000000000000</id>
                    <parentGroupId>eff4ec5e-a8ab-3310-0000-000000000000</parentGroupId>
                    <position>
                        <x>-414.3839058471535</x>
                        <y>-6.6310914048667655</y>
                    </position>
                    <height>732.0</height>
                    <label>This module(Processor Group) is going to demonstrate the way on 
how to prepare a batched SQLs when trying to look up a range of data in DB.

Once you finish importing this module, here is the guide on

+++++++++++++++++++++++
HOW TO USE THIS MODULE:
1. Start the whole module's processors, check the content/attribute of the flowfile coming from the sub module(01 PrepareBatchedSQLStr)
2. Check the batched sqlStr.
+++++++++++++++++++++++


You can try below stuff, if you'd like to do some further validation on this algorithm: 
1. CHANGE the module variable named batchSize.
With different batchSize settings, you would see the sqlStr being prepared accordingly.
(
    A. Right click on empty area on the canvas of current Module--BatchPreSQL, 
    B. Then select and click "Variables" button to jump into Variables setting page.
    C. Set variable-batchSize with other int values(postive int number exclude 0) to configure the pageSize of current algorithm.
        1. batchSize &gt; IDListLength 
        2. batchSize =  IDListLength
        3. batchSize &lt; IDListLength (postive int number exclude 0 or you may use abs EL function to support the negative values)
    D. Run the whole biz flow to verify such result.
)
2. You may also apply with larger raw json array to run the biz flow.


Please feel free to let me know if you have any question.</label>
                    <style>
                        <entry>
                            <key>background-color</key>
                            <value>#53b3d9</value>
                        </entry>
                        <entry>
                            <key>font-size</key>
                            <value>24px</value>
                        </entry>
                    </style>
                    <width>1514.0</width>
                </labels>
                <processGroups>
                    <id>56e121fa-15d6-3fd4-0000-000000000000</id>
                    <parentGroupId>eff4ec5e-a8ab-3310-0000-000000000000</parentGroupId>
                    <position>
                        <x>217.61609415284647</x>
                        <y>1104.3689543715004</y>
                    </position>
                    <additions/>
                    <comments></comments>
                    <contents>
                        <connections>
                            <id>07a8f12b-7736-3a6c-0000-000000000000</id>
                            <parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
                            <backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
                            <backPressureObjectThreshold>10000</backPressureObjectThreshold>
                            <destination>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>9ef7cadf-476e-3437-0000-000000000000</id>
<type>PROCESSOR</type>
                            </destination>
                            <flowFileExpiration>0 sec</flowFileExpiration>
                            <labelIndex>1</labelIndex>
                            <name></name>
                            <selectedRelationships>failure</selectedRelationships>
                            <selectedRelationships>unmatched</selectedRelationships>
                            <source>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>4ee2df31-2f9b-34d5-0000-000000000000</id>
<type>PROCESSOR</type>
                            </source>
                            <zIndex>0</zIndex>
                        </connections>
                        <connections>
                            <id>13b22f01-b24d-359f-0000-000000000000</id>
                            <parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
                            <backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
                            <backPressureObjectThreshold>10000</backPressureObjectThreshold>
                            <destination>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>60fb5121-b8bc-394a-0000-000000000000</id>
<type>PROCESSOR</type>
                            </destination>
                            <flowFileExpiration>0 sec</flowFileExpiration>
                            <labelIndex>1</labelIndex>
                            <name></name>
                            <selectedRelationships>success</selectedRelationships>
                            <source>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>19e51c58-e665-33c7-0000-000000000000</id>
<type>PROCESSOR</type>
                            </source>
                            <zIndex>0</zIndex>
                        </connections>
                        <connections>
                            <id>1736190e-f132-3dde-0000-000000000000</id>
                            <parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
                            <backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
                            <backPressureObjectThreshold>10000</backPressureObjectThreshold>
                            <destination>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>2428e910-0762-38e2-0000-000000000000</id>
<type>FUNNEL</type>
                            </destination>
                            <flowFileExpiration>0 sec</flowFileExpiration>
                            <labelIndex>1</labelIndex>
                            <name>Finish Batch Loop</name>
                            <selectedRelationships>unmatched</selectedRelationships>
                            <source>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>97407168-ea79-32f1-0000-000000000000</id>
<type>PROCESSOR</type>
                            </source>
                            <zIndex>0</zIndex>
                        </connections>
                        <connections>
                            <id>37a04d0a-9f84-3813-0000-000000000000</id>
                            <parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
                            <backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
                            <backPressureObjectThreshold>10000</backPressureObjectThreshold>
                            <destination>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>ed04498f-82f7-37f7-0000-000000000000</id>
<type>PROCESSOR</type>
                            </destination>
                            <flowFileExpiration>0 sec</flowFileExpiration>
                            <labelIndex>1</labelIndex>
                            <name></name>
                            <selectedRelationships>1. Inside the Batch Loop</selectedRelationships>
                            <source>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>97407168-ea79-32f1-0000-000000000000</id>
<type>PROCESSOR</type>
                            </source>
                            <zIndex>0</zIndex>
                        </connections>
                        <connections>
                            <id>4a4cc1bc-c93c-37f3-0000-000000000000</id>
                            <parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
                            <backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
                            <backPressureObjectThreshold>10000</backPressureObjectThreshold>
                            <destination>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>b8c0272c-e161-3456-0000-000000000000</id>
<type>OUTPUT_PORT</type>
                            </destination>
                            <flowFileExpiration>0 sec</flowFileExpiration>
                            <labelIndex>1</labelIndex>
                            <name></name>
                            <selectedRelationships>failure</selectedRelationships>
                            <selectedRelationships>success</selectedRelationships>
                            <source>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>60fb5121-b8bc-394a-0000-000000000000</id>
<type>PROCESSOR</type>
                            </source>
                            <zIndex>0</zIndex>
                        </connections>
                        <connections>
                            <id>4eef65de-4c95-3ca3-0000-000000000000</id>
                            <parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
                            <backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
                            <backPressureObjectThreshold>10000</backPressureObjectThreshold>
                            <destination>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>6ff09bae-c4a1-38c5-0000-000000000000</id>
<type>OUTPUT_PORT</type>
                            </destination>
                            <flowFileExpiration>0 sec</flowFileExpiration>
                            <labelIndex>1</labelIndex>
                            <name></name>
                            <selectedRelationships>success</selectedRelationships>
                            <source>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>6837ac99-5ce2-3c47-0000-000000000000</id>
<type>PROCESSOR</type>
                            </source>
                            <zIndex>0</zIndex>
                        </connections>
                        <connections>
                            <id>5cd2322c-6cf8-3117-0000-000000000000</id>
                            <parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
                            <backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
                            <backPressureObjectThreshold>10000</backPressureObjectThreshold>
                            <destination>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>97407168-ea79-32f1-0000-000000000000</id>
<type>PROCESSOR</type>
                            </destination>
                            <flowFileExpiration>0 sec</flowFileExpiration>
                            <labelIndex>1</labelIndex>
                            <name></name>
                            <selectedRelationships>success</selectedRelationships>
                            <source>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>f784d98d-4eaf-395c-0000-000000000000</id>
<type>PROCESSOR</type>
                            </source>
                            <zIndex>0</zIndex>
                        </connections>
                        <connections>
                            <id>660fbef2-894b-3ba6-0000-000000000000</id>
                            <parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
                            <backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
                            <backPressureObjectThreshold>10000</backPressureObjectThreshold>
                            <destination>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>e4d04817-e7d4-34a9-0000-000000000000</id>
<type>PROCESSOR</type>
                            </destination>
                            <flowFileExpiration>0 sec</flowFileExpiration>
                            <labelIndex>1</labelIndex>
                            <name></name>
                            <selectedRelationships>success</selectedRelationships>
                            <source>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>ed04498f-82f7-37f7-0000-000000000000</id>
<type>PROCESSOR</type>
                            </source>
                            <zIndex>0</zIndex>
                        </connections>
                        <connections>
                            <id>6a063a4c-e0e0-3f3f-0000-000000000000</id>
                            <parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
                            <backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
                            <backPressureObjectThreshold>10000</backPressureObjectThreshold>
                            <destination>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>3e3e7f3b-3d3a-3b8d-0000-000000000000</id>
<type>OUTPUT_PORT</type>
                            </destination>
                            <flowFileExpiration>0 sec</flowFileExpiration>
                            <labelIndex>1</labelIndex>
                            <name></name>
                            <source>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>2428e910-0762-38e2-0000-000000000000</id>
<type>FUNNEL</type>
                            </source>
                            <zIndex>0</zIndex>
                        </connections>
                        <connections>
                            <id>73d4d5b3-ed4d-33ef-0000-000000000000</id>
                            <parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
                            <backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
                            <backPressureObjectThreshold>10000</backPressureObjectThreshold>
                            <destination>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>f784d98d-4eaf-395c-0000-000000000000</id>
<type>PROCESSOR</type>
                            </destination>
                            <flowFileExpiration>0 sec</flowFileExpiration>
                            <labelIndex>1</labelIndex>
                            <name></name>
                            <selectedRelationships>matched</selectedRelationships>
                            <source>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>4ee2df31-2f9b-34d5-0000-000000000000</id>
<type>PROCESSOR</type>
                            </source>
                            <zIndex>0</zIndex>
                        </connections>
                        <connections>
                            <id>84fbfa68-f4f4-3ff8-0000-000000000000</id>
                            <parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
                            <backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
                            <backPressureObjectThreshold>10000</backPressureObjectThreshold>
                            <destination>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>a37238fd-6b1a-334b-0000-000000000000</id>
<type>PROCESSOR</type>
                            </destination>
                            <flowFileExpiration>0 sec</flowFileExpiration>
                            <labelIndex>1</labelIndex>
                            <name></name>
                            <source>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>8901cdca-8fdd-3d01-0000-000000000000</id>
<type>INPUT_PORT</type>
                            </source>
                            <zIndex>0</zIndex>
                        </connections>
                        <connections>
                            <id>955217f3-8048-314d-0000-000000000000</id>
                            <parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
                            <backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
                            <backPressureObjectThreshold>10000</backPressureObjectThreshold>
                            <destination>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>4ee2df31-2f9b-34d5-0000-000000000000</id>
<type>PROCESSOR</type>
                            </destination>
                            <flowFileExpiration>0 sec</flowFileExpiration>
                            <labelIndex>1</labelIndex>
                            <name></name>
                            <selectedRelationships>success</selectedRelationships>
                            <source>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>a37238fd-6b1a-334b-0000-000000000000</id>
<type>PROCESSOR</type>
                            </source>
                            <zIndex>0</zIndex>
                        </connections>
                        <connections>
                            <id>b447a208-a363-3f24-0000-000000000000</id>
                            <parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
                            <backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
                            <backPressureObjectThreshold>10000</backPressureObjectThreshold>
                            <destination>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>19e51c58-e665-33c7-0000-000000000000</id>
<type>PROCESSOR</type>
                            </destination>
                            <flowFileExpiration>0 sec</flowFileExpiration>
                            <labelIndex>1</labelIndex>
                            <name></name>
                            <selectedRelationships>success</selectedRelationships>
                            <source>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>ed04498f-82f7-37f7-0000-000000000000</id>
<type>PROCESSOR</type>
                            </source>
                            <zIndex>0</zIndex>
                        </connections>
                        <connections>
                            <id>b5babaa5-ec72-3496-0000-000000000000</id>
                            <parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
                            <backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
                            <backPressureObjectThreshold>10000</backPressureObjectThreshold>
                            <destination>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>6837ac99-5ce2-3c47-0000-000000000000</id>
<type>PROCESSOR</type>
                            </destination>
                            <flowFileExpiration>0 sec</flowFileExpiration>
                            <labelIndex>1</labelIndex>
                            <name></name>
                            <selectedRelationships>failure</selectedRelationships>
                            <source>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>a37238fd-6b1a-334b-0000-000000000000</id>
<type>PROCESSOR</type>
                            </source>
                            <zIndex>0</zIndex>
                        </connections>
                        <connections>
                            <id>b97777ac-b7ee-30e2-0000-000000000000</id>
                            <parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
                            <backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
                            <backPressureObjectThreshold>10000</backPressureObjectThreshold>
                            <destination>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>97407168-ea79-32f1-0000-000000000000</id>
<type>PROCESSOR</type>
                            </destination>
                            <flowFileExpiration>0 sec</flowFileExpiration>
                            <labelIndex>1</labelIndex>
                            <name></name>
                            <selectedRelationships>success</selectedRelationships>
                            <source>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>e4d04817-e7d4-34a9-0000-000000000000</id>
<type>PROCESSOR</type>
                            </source>
                            <zIndex>0</zIndex>
                        </connections>
                        <connections>
                            <id>fd991255-d565-30f1-0000-000000000000</id>
                            <parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
                            <backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
                            <backPressureObjectThreshold>10000</backPressureObjectThreshold>
                            <destination>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>6ff09bae-c4a1-38c5-0000-000000000000</id>
<type>OUTPUT_PORT</type>
                            </destination>
                            <flowFileExpiration>0 sec</flowFileExpiration>
                            <labelIndex>1</labelIndex>
                            <name></name>
                            <selectedRelationships>success</selectedRelationships>
                            <source>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>9ef7cadf-476e-3437-0000-000000000000</id>
<type>PROCESSOR</type>
                            </source>
                            <zIndex>0</zIndex>
                        </connections>
                        <funnels>
                            <id>2428e910-0762-38e2-0000-000000000000</id>
                            <parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
                            <position>
<x>1177.000005285659</x>
<y>694.9999968519692</y>
                            </position>
                        </funnels>
                        <inputPorts>
                            <id>8901cdca-8fdd-3d01-0000-000000000000</id>
                            <parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
                            <position>
<x>486.000005285659</x>
<y>46.99999685196917</y>
                            </position>
                            <concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
                            <name>IN</name>
                            <state>RUNNING</state>
                            <type>INPUT_PORT</type>
                        </inputPorts>
                        <labels>
                            <id>29a66daf-8ba8-387c-0000-000000000000</id>
                            <parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
                            <position>
<x>1070.6160941528465</x>
<y>945.3690154066567</y>
                            </position>
                            <height>124.0</height>
                            <label>JSONPath

[start:end]

Selects array elements from the start index and up to,
but not including, end index. 

[start:]
If end is omitted, 
selects all elements from start until the end of the array. Returns a list.</label>
                            <style>
<entry>
    <key>font-size</key>
    <value>12px</value>
</entry>
                            </style>
                            <width>393.0</width>
                        </labels>
                        <labels>
                            <id>69a045b5-f89a-3bdd-0000-000000000000</id>
                            <parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
                            <position>
<x>707.000005285659</x>
<y>243.99999685196917</y>
                            </position>
                            <height>47.0</height>
                            <label>Here we are going to extract the IDList which contains the whole ID(s) from the raw data.</label>
                            <style>
<entry>
    <key>font-size</key>
    <value>18px</value>
</entry>
                            </style>
                            <width>732.0</width>
                        </labels>
                        <outputPorts>
                            <id>3e3e7f3b-3d3a-3b8d-0000-000000000000</id>
                            <parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
                            <position>
<x>1418.7386527465965</x>
<y>683.4886290541176</y>
                            </position>
                            <concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
                            <name>Finish Preparing Batch SQL strs</name>
                            <state>RUNNING</state>
                            <type>OUTPUT_PORT</type>
                        </outputPorts>
                        <outputPorts>
                            <id>6ff09bae-c4a1-38c5-0000-000000000000</id>
                            <parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
                            <position>
<x>-250.38390584715353</x>
<y>340.3689543715004</y>
                            </position>
                            <concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
                            <name>Exceptions</name>
                            <state>RUNNING</state>
                            <type>OUTPUT_PORT</type>
                        </outputPorts>
                        <outputPorts>
                            <id>b8c0272c-e161-3456-0000-000000000000</id>
                            <parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
                            <position>
<x>835.7386527465965</x>
<y>1555.4886290541176</y>
                            </position>
                            <comments></comments>
                            <concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
                            <name>Go2RemainBiz</name>
                            <state>RUNNING</state>
                            <type>OUTPUT_PORT</type>
                        </outputPorts>
                        <processors>
                            <id>19e51c58-e665-33c7-0000-000000000000</id>
                            <parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
                            <position>
<x>883.6160941528465</x>
<y>1151.3689543715004</y>
                            </position>
                            <bundle>
<artifact>nifi-update-attribute-nar</artifact>
<group>org.apache.nifi</group>
<version>1.7.1-2.0_09000</version>
                            </bundle>
                            <config>
<bulletinLevel>WARN</bulletinLevel>
<comments>If there only 1 element in the fetching scope, we would surround it with ('&lt;LastPage_OnlyOneExpectedValue2Fetch&gt;')</comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
    <entry>
        <key>Delete Attributes Expression</key>
        <value>
            <name>Delete Attributes Expression</name>
        </value>
    </entry>
    <entry>
        <key>Store State</key>
        <value>
            <name>Store State</name>
        </value>
    </entry>
    <entry>
        <key>Stateful Variables Initial Value</key>
        <value>
            <name>Stateful Variables Initial Value</name>
        </value>
    </entry>
    <entry>
        <key>sqlStr</key>
        <value>
            <name>sqlStr</name>
        </value>
    </entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
    <entry>
        <key>Delete Attributes Expression</key>
    </entry>
    <entry>
        <key>Store State</key>
        <value>Do not store state</value>
    </entry>
    <entry>
        <key>Stateful Variables Initial Value</key>
    </entry>
    <entry>
        <key>sqlStr</key>
        <value>SELECT 
	* 
FROM 
	&lt;TABLE_NAME&gt;
WHERE
	ID IN ${sqlStrBatchCondition:contains(','):not()
				:ifElse(
					"('${sqlStrBatchCondition}')",
					${sqlStrBatchCondition}
				)
			}</value>
    </entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
                            </config>
                            <executionNodeRestricted>false</executionNodeRestricted>
                            <name>prepareSQLStrs</name>
                            <relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
                            </relationships>
                            <state>RUNNING</state>
                            <style/>
                            <type>org.apache.nifi.processors.attributes.UpdateAttribute</type>
                        </processors>
                        <processors>
                            <id>4ee2df31-2f9b-34d5-0000-000000000000</id>
                            <parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
                            <position>
<x>564.7386527465965</x>
<y>439.4885985365395</y>
                            </position>
                            <bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.7.1-2.0_09000</version>
                            </bundle>
                            <config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
    <entry>
        <key>Destination</key>
        <value>
            <name>Destination</name>
        </value>
    </entry>
    <entry>
        <key>Return Type</key>
        <value>
            <name>Return Type</name>
        </value>
    </entry>
    <entry>
        <key>Path Not Found Behavior</key>
        <value>
            <name>Path Not Found Behavior</name>
        </value>
    </entry>
    <entry>
        <key>Null Value Representation</key>
        <value>
            <name>Null Value Representation</name>
        </value>
    </entry>
    <entry>
        <key>IDList</key>
        <value>
            <name>IDList</name>
        </value>
    </entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
    <entry>
        <key>Destination</key>
        <value>flowfile-attribute</value>
    </entry>
    <entry>
        <key>Return Type</key>
        <value>json</value>
    </entry>
    <entry>
        <key>Path Not Found Behavior</key>
        <value>ignore</value>
    </entry>
    <entry>
        <key>Null Value Representation</key>
        <value>empty string</value>
    </entry>
    <entry>
        <key>IDList</key>
        <value>$.IDList</value>
    </entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
                            </config>
                            <executionNodeRestricted>false</executionNodeRestricted>
                            <name>EvaluateJsonPath</name>
                            <relationships>
<autoTerminate>false</autoTerminate>
<name>failure</name>
                            </relationships>
                            <relationships>
<autoTerminate>false</autoTerminate>
<name>matched</name>
                            </relationships>
                            <relationships>
<autoTerminate>false</autoTerminate>
<name>unmatched</name>
                            </relationships>
                            <state>RUNNING</state>
                            <style/>
                            <type>org.apache.nifi.processors.standard.EvaluateJsonPath</type>
                        </processors>
                        <processors>
                            <id>60fb5121-b8bc-394a-0000-000000000000</id>
                            <parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
                            <position>
<x>893.7386527465965</x>
<y>1365.4886290541176</y>
                            </position>
                            <bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.7.1-2.0_11005</version>
                            </bundle>
                            <config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
    <entry>
        <key>Regular Expression</key>
        <value>
            <name>Regular Expression</name>
        </value>
    </entry>
    <entry>
        <key>Replacement Value</key>
        <value>
            <name>Replacement Value</name>
        </value>
    </entry>
    <entry>
        <key>Character Set</key>
        <value>
            <name>Character Set</name>
        </value>
    </entry>
    <entry>
        <key>Maximum Buffer Size</key>
        <value>
            <name>Maximum Buffer Size</name>
        </value>
    </entry>
    <entry>
        <key>Replacement Strategy</key>
        <value>
            <name>Replacement Strategy</name>
        </value>
    </entry>
    <entry>
        <key>Evaluation Mode</key>
        <value>
            <name>Evaluation Mode</name>
        </value>
    </entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
    <entry>
        <key>Regular Expression</key>
        <value>(?s)(^.*$)</value>
    </entry>
    <entry>
        <key>Replacement Value</key>
        <value>==========
TotalPage: ${totalPage}
CurrentPage: ${currentPage}
PageSize:${batchSize}
==========
SQL is:
${sqlStr}</value>
    </entry>
    <entry>
        <key>Character Set</key>
        <value>UTF-8</value>
    </entry>
    <entry>
        <key>Maximum Buffer Size</key>
        <value>1 MB</value>
    </entry>
    <entry>
        <key>Replacement Strategy</key>
        <value>Always Replace</value>
    </entry>
    <entry>
        <key>Evaluation Mode</key>
        <value>Entire text</value>
    </entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
                            </config>
                            <executionNodeRestricted>false</executionNodeRestricted>
                            <name>PrintResult</name>
                            <relationships>
<autoTerminate>false</autoTerminate>
<name>failure</name>
                            </relationships>
                            <relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
                            </relationships>
                            <state>RUNNING</state>
                            <style/>
                            <type>org.apache.nifi.processors.standard.ReplaceText</type>
                        </processors>
                        <processors>
                            <id>6837ac99-5ce2-3c47-0000-000000000000</id>
                            <parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
                            <position>
<x>245.00000528565897</x>
<y>227.99999685196917</y>
                            </position>
                            <bundle>
<artifact>nifi-update-attribute-nar</artifact>
<group>org.apache.nifi</group>
<version>1.7.1-2.0_09000</version>
                            </bundle>
                            <config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
    <entry>
        <key>Delete Attributes Expression</key>
        <value>
            <name>Delete Attributes Expression</name>
        </value>
    </entry>
    <entry>
        <key>Store State</key>
        <value>
            <name>Store State</name>
        </value>
    </entry>
    <entry>
        <key>Stateful Variables Initial Value</key>
        <value>
            <name>Stateful Variables Initial Value</name>
        </value>
    </entry>
    <entry>
        <key>msg</key>
        <value>
            <name>msg</name>
        </value>
    </entry>
    <entry>
        <key>msgCode</key>
        <value>
            <name>msgCode</name>
        </value>
    </entry>
    <entry>
        <key>status</key>
        <value>
            <name>status</name>
        </value>
    </entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
    <entry>
        <key>Delete Attributes Expression</key>
    </entry>
    <entry>
        <key>Store State</key>
        <value>Do not store state</value>
    </entry>
    <entry>
        <key>Stateful Variables Initial Value</key>
    </entry>
    <entry>
        <key>msg</key>
        <value>Failed to extract IDList from raw data via JOLT.</value>
    </entry>
    <entry>
        <key>msgCode</key>
        <value>01:01</value>
    </entry>
    <entry>
        <key>status</key>
        <value>failed</value>
    </entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
                            </config>
                            <executionNodeRestricted>false</executionNodeRestricted>
                            <name>01 PrepareExceptionInfo</name>
                            <relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
                            </relationships>
                            <state>RUNNING</state>
                            <style/>
                            <type>org.apache.nifi.processors.attributes.UpdateAttribute</type>
                        </processors>
                        <processors>
                            <id>97407168-ea79-32f1-0000-000000000000</id>
                            <parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
                            <position>
<x>868.7386527465965</x>
<y>670.4886290541176</y>
                            </position>
                            <bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.7.1-2.0_09000</version>
                            </bundle>
                            <config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
    <entry>
        <key>Routing Strategy</key>
        <value>
            <name>Routing Strategy</name>
        </value>
    </entry>
    <entry>
        <key>1. Inside the Batch Loop</key>
        <value>
            <name>1. Inside the Batch Loop</name>
        </value>
    </entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
    <entry>
        <key>Routing Strategy</key>
        <value>Route to Property name</value>
    </entry>
    <entry>
        <key>1. Inside the Batch Loop</key>
        <value>${currentPage:le(${totalPage})}</value>
    </entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
                            </config>
                            <executionNodeRestricted>false</executionNodeRestricted>
                            <name>RouteOnAttribute</name>
                            <relationships>
<autoTerminate>false</autoTerminate>
<name>1. Inside the Batch Loop</name>
                            </relationships>
                            <relationships>
<autoTerminate>false</autoTerminate>
<name>unmatched</name>
                            </relationships>
                            <state>RUNNING</state>
                            <style/>
                            <type>org.apache.nifi.processors.standard.RouteOnAttribute</type>
                        </processors>
                        <processors>
                            <id>9ef7cadf-476e-3437-0000-000000000000</id>
                            <parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
                            <position>
<x>247.00000528565897</x>
<y>439.9999968519692</y>
                            </position>
                            <bundle>
<artifact>nifi-update-attribute-nar</artifact>
<group>org.apache.nifi</group>
<version>1.7.1-2.0_09000</version>
                            </bundle>
                            <config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
    <entry>
        <key>Delete Attributes Expression</key>
        <value>
            <name>Delete Attributes Expression</name>
        </value>
    </entry>
    <entry>
        <key>Store State</key>
        <value>
            <name>Store State</name>
        </value>
    </entry>
    <entry>
        <key>Stateful Variables Initial Value</key>
        <value>
            <name>Stateful Variables Initial Value</name>
        </value>
    </entry>
    <entry>
        <key>msg</key>
        <value>
            <name>msg</name>
        </value>
    </entry>
    <entry>
        <key>msgCode</key>
        <value>
            <name>msgCode</name>
        </value>
    </entry>
    <entry>
        <key>status</key>
        <value>
            <name>status</name>
        </value>
    </entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
    <entry>
        <key>Delete Attributes Expression</key>
    </entry>
    <entry>
        <key>Store State</key>
        <value>Do not store state</value>
    </entry>
    <entry>
        <key>Stateful Variables Initial Value</key>
    </entry>
    <entry>
        <key>msg</key>
        <value>Failed to extract IDList from flowfile content to attribute.</value>
    </entry>
    <entry>
        <key>msgCode</key>
        <value>01:02</value>
    </entry>
    <entry>
        <key>status</key>
        <value>failed</value>
    </entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
                            </config>
                            <executionNodeRestricted>false</executionNodeRestricted>
                            <name>02 PrepareExceptionInfo</name>
                            <relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
                            </relationships>
                            <state>RUNNING</state>
                            <style/>
                            <type>org.apache.nifi.processors.attributes.UpdateAttribute</type>
                        </processors>
                        <processors>
                            <id>a37238fd-6b1a-334b-0000-000000000000</id>
                            <parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
                            <position>
<x>558.6317191528465</x>
<y>227.36889333634417</y>
                            </position>
                            <bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.7.1-2.0_11005</version>
                            </bundle>
                            <config>
<bulletinLevel>WARN</bulletinLevel>
<comments>Here we are going to extract the IDList which contains the whole ID(s) from the raw data.</comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
    <entry>
        <key>jolt-transform</key>
        <value>
            <name>jolt-transform</name>
        </value>
    </entry>
    <entry>
        <key>jolt-custom-class</key>
        <value>
            <name>jolt-custom-class</name>
        </value>
    </entry>
    <entry>
        <key>jolt-custom-modules</key>
        <value>
            <name>jolt-custom-modules</name>
        </value>
    </entry>
    <entry>
        <key>jolt-spec</key>
        <value>
            <name>jolt-spec</name>
        </value>
    </entry>
    <entry>
        <key>Transform Cache Size</key>
        <value>
            <name>Transform Cache Size</name>
        </value>
    </entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
    <entry>
        <key>jolt-transform</key>
        <value>jolt-transform-chain</value>
    </entry>
    <entry>
        <key>jolt-custom-class</key>
    </entry>
    <entry>
        <key>jolt-custom-modules</key>
    </entry>
    <entry>
        <key>jolt-spec</key>
        <value>[{
    "operation": "shift",
    "spec": {
      "*":{
      	"ID":"IDList"
      }
    }
  }
]</value>
    </entry>
    <entry>
        <key>Transform Cache Size</key>
        <value>1</value>
    </entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
                            </config>
                            <executionNodeRestricted>false</executionNodeRestricted>
                            <name>JoltTransformJSON</name>
                            <relationships>
<autoTerminate>false</autoTerminate>
<name>failure</name>
                            </relationships>
                            <relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
                            </relationships>
                            <state>RUNNING</state>
                            <style/>
                            <type>org.apache.nifi.processors.standard.JoltTransformJSON</type>
                        </processors>
                        <processors>
                            <id>e4d04817-e7d4-34a9-0000-000000000000</id>
                            <parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
                            <position>
<x>577.7386527465965</x>
<y>946.4886290541176</y>
                            </position>
                            <bundle>
<artifact>nifi-update-attribute-nar</artifact>
<group>org.apache.nifi</group>
<version>1.7.1-2.0_09000</version>
                            </bundle>
                            <config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
    <entry>
        <key>Delete Attributes Expression</key>
        <value>
            <name>Delete Attributes Expression</name>
        </value>
    </entry>
    <entry>
        <key>Store State</key>
        <value>
            <name>Store State</name>
        </value>
    </entry>
    <entry>
        <key>Stateful Variables Initial Value</key>
        <value>
            <name>Stateful Variables Initial Value</name>
        </value>
    </entry>
    <entry>
        <key>currentPage</key>
        <value>
            <name>currentPage</name>
        </value>
    </entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
    <entry>
        <key>Delete Attributes Expression</key>
    </entry>
    <entry>
        <key>Store State</key>
        <value>Do not store state</value>
    </entry>
    <entry>
        <key>Stateful Variables Initial Value</key>
    </entry>
    <entry>
        <key>currentPage</key>
        <value>${currentPage:plus(1)}</value>
    </entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
                            </config>
                            <executionNodeRestricted>false</executionNodeRestricted>
                            <name>ScrollcurrentPage</name>
                            <relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
                            </relationships>
                            <state>RUNNING</state>
                            <style/>
                            <type>org.apache.nifi.processors.attributes.UpdateAttribute</type>
                        </processors>
                        <processors>
                            <id>ed04498f-82f7-37f7-0000-000000000000</id>
                            <parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
                            <position>
<x>874.7386527465965</x>
<y>947.4886290541176</y>
                            </position>
                            <bundle>
<artifact>nifi-update-attribute-nar</artifact>
<group>org.apache.nifi</group>
<version>1.7.1-2.0_09000</version>
                            </bundle>
                            <config>
<annotationData>&lt;criteria&gt;
    &lt;flowFilePolicy&gt;USE_ORIGINAL&lt;/flowFilePolicy&gt;
    &lt;rules&gt;
        &lt;actions&gt;
            &lt;attribute&gt;sqlStrBatchCondition&lt;/attribute&gt;
            &lt;id&gt;570f5b7b-f835-4c8d-8f5d-e08deda1c7c1&lt;/id&gt;
            &lt;value&gt;${IDList:jsonPath("$[0:${IDListLength}]")
	:replaceAll(',',"','"):replaceAll('\[',"('"):replaceAll('\]',"')")
}
              &lt;/value&gt;
        &lt;/actions&gt;
        &lt;conditions&gt;
            &lt;expression&gt;${currentPage:equals(1)}&lt;/expression&gt;
            &lt;id&gt;97d055e9-12d2-439c-93a4-8642f0bebd6d&lt;/id&gt;
        &lt;/conditions&gt;
        &lt;conditions&gt;
            &lt;expression&gt;${IDListLength:le(${pageSize})}&lt;/expression&gt;
            &lt;id&gt;fa0053b2-a183-4f33-9788-3fe68ccae5f1&lt;/id&gt;
        &lt;/conditions&gt;
        &lt;id&gt;c0bb8d5c-cb73-40f5-8323-632122cb75ac&lt;/id&gt;
        &lt;name&gt;1.1 First Page &amp;amp;&amp;amp; IDListLength &amp;lt;= pageSize &lt;/name&gt;
    &lt;/rules&gt;
    &lt;rules&gt;
        &lt;actions&gt;
            &lt;attribute&gt;sqlStrBatchCondition&lt;/attribute&gt;
            &lt;id&gt;570f5b7b-f835-4c8d-8f5d-e08deda1c7c1&lt;/id&gt;
            &lt;value&gt;${IDList:jsonPath("$[0:${pageSize}]")
	:replaceAll(',',"','"):replaceAll('\[',"('"):replaceAll('\]',"')")
}&lt;/value&gt;
        &lt;/actions&gt;
        &lt;conditions&gt;
            &lt;expression&gt;${currentPage:equals(1)}&lt;/expression&gt;
            &lt;id&gt;97d055e9-12d2-439c-93a4-8642f0bebd6d&lt;/id&gt;
        &lt;/conditions&gt;
        &lt;conditions&gt;
            &lt;expression&gt;${IDListLength:le(${pageSize}):not()}&lt;/expression&gt;
            &lt;id&gt;fa0053b2-a183-4f33-9788-3fe68ccae5f1&lt;/id&gt;
        &lt;/conditions&gt;
        &lt;id&gt;b351555e-2dd8-4303-bd05-6aff43731a38&lt;/id&gt;
        &lt;name&gt;1.2 First Page &amp;amp;&amp;amp; IDListLength &amp;gt; pageSize &lt;/name&gt;
    &lt;/rules&gt;
    &lt;rules&gt;
        &lt;actions&gt;
            &lt;attribute&gt;sqlStrBatchCondition&lt;/attribute&gt;
            &lt;id&gt;570f5b7b-f835-4c8d-8f5d-e08deda1c7c1&lt;/id&gt;
            &lt;value&gt;${IDList:jsonPath("$[${currentPage:minus(1):multiply(${pageSize})}:${currentPage:minus(1):multiply(${pageSize}):plus(${pageSize})}]")
	:replaceAll(',',"','"):replaceAll('\[',"('"):replaceAll('\]',"')")
}&lt;/value&gt;
        &lt;/actions&gt;
        &lt;conditions&gt;
            &lt;expression&gt;${currentPage:lt(${totalPage})}&lt;/expression&gt;
            &lt;id&gt;46a93c40-dac1-4502-8dac-ceb2b12a2756&lt;/id&gt;
        &lt;/conditions&gt;
        &lt;conditions&gt;
            &lt;expression&gt;${currentPage:gt(1)}&lt;/expression&gt;
            &lt;id&gt;97d055e9-12d2-439c-93a4-8642f0bebd6d&lt;/id&gt;
        &lt;/conditions&gt;
        &lt;id&gt;e19f2512-98be-47fa-8e0b-76339348f853&lt;/id&gt;
        &lt;name&gt;2. MiddlePage&lt;/name&gt;
    &lt;/rules&gt;
    &lt;rules&gt;
        &lt;actions&gt;
            &lt;attribute&gt;sqlStrBatchCondition&lt;/attribute&gt;
            &lt;id&gt;570f5b7b-f835-4c8d-8f5d-e08deda1c7c1&lt;/id&gt;
            &lt;value&gt;${IDList:jsonPath("$[${currentPage:minus(1):multiply(${pageSize})}:${IDListLength}]")
	:replaceAll(',',"','"):replaceAll('\[',"('"):replaceAll('\]',"')")
}&lt;/value&gt;
        &lt;/actions&gt;
        &lt;conditions&gt;
            &lt;expression&gt;${currentPage:equals(${totalPage})}&lt;/expression&gt;
            &lt;id&gt;97d055e9-12d2-439c-93a4-8642f0bebd6d&lt;/id&gt;
        &lt;/conditions&gt;
        &lt;conditions&gt;
            &lt;expression&gt;${totalPage:equals(1):not()}&lt;/expression&gt;
            &lt;id&gt;46a93c40-dac1-4502-8dac-ceb2b12a2756&lt;/id&gt;
        &lt;/conditions&gt;
        &lt;id&gt;0d277b81-3121-4d87-844d-4dcc1325fb2c&lt;/id&gt;
        &lt;name&gt;3. LastPage &amp;amp;&amp;amp; totalPage !=1&lt;/name&gt;
    &lt;/rules&gt;
&lt;/criteria&gt;</annotationData>
<bulletinLevel>WARN</bulletinLevel>
<comments>SELECT 
	* 
FROM 
	&lt;TABLE_NAME&gt;
WHERE
	ID IN ${index:equals(${specificFlag:replaceEmpty('')})
            :ifElse(
              ${IDList:jsonPath("$[${index:multiply(${batchSize})}:${IDListLength:minus(1)}]")
                    :replaceAll(',',"','"):replaceAll('\[',"('"):replaceAll('\]',"')")
                },
              ${IDList:jsonPath("$[${index:multiply(${batchSize})}:${index:multiply(${batchSize}):plus(${batchSize})}]")
                    :replaceAll(',',"','"):replaceAll('\[',"('"):replaceAll('\]',"')")
                }
            )
         }</comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
    <entry>
        <key>Delete Attributes Expression</key>
        <value>
            <name>Delete Attributes Expression</name>
        </value>
    </entry>
    <entry>
        <key>Store State</key>
        <value>
            <name>Store State</name>
        </value>
    </entry>
    <entry>
        <key>Stateful Variables Initial Value</key>
        <value>
            <name>Stateful Variables Initial Value</name>
        </value>
    </entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>0 sec</penaltyDuration>
<properties>
    <entry>
        <key>Delete Attributes Expression</key>
    </entry>
    <entry>
        <key>Store State</key>
        <value>Do not store state</value>
    </entry>
    <entry>
        <key>Stateful Variables Initial Value</key>
    </entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
                            </config>
                            <executionNodeRestricted>false</executionNodeRestricted>
                            <name>prepareWhereClauseConditionPart</name>
                            <relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
                            </relationships>
                            <state>RUNNING</state>
                            <style/>
                            <type>org.apache.nifi.processors.attributes.UpdateAttribute</type>
                        </processors>
                        <processors>
                            <id>f784d98d-4eaf-395c-0000-000000000000</id>
                            <parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
                            <position>
<x>566.7386527465965</x>
<y>666.4886290541176</y>
                            </position>
                            <bundle>
<artifact>nifi-update-attribute-nar</artifact>
<group>org.apache.nifi</group>
<version>1.7.1-2.0_09000</version>
                            </bundle>
                            <config>
<annotationData>&lt;criteria&gt;
    &lt;flowFilePolicy&gt;USE_ORIGINAL&lt;/flowFilePolicy&gt;
    &lt;rules&gt;
        &lt;actions&gt;
            &lt;attribute&gt;totalPage&lt;/attribute&gt;
            &lt;id&gt;0c63c6dd-1748-4e21-8e35-e8645341dd48&lt;/id&gt;
            &lt;value&gt;${IDList:jsonPath('$.length()'):divide(${batchSize})}&lt;/value&gt;
        &lt;/actions&gt;
        &lt;conditions&gt;
            &lt;expression&gt;${IDList:jsonPath('$.length()'):mod(${batchSize}):equals('0')}&lt;/expression&gt;
            &lt;id&gt;20c60e9a-db68-44b1-a22b-e3f83a57dd8a&lt;/id&gt;
        &lt;/conditions&gt;
        &lt;id&gt;2fb2c755-2084-409d-b48e-176751d78930&lt;/id&gt;
        &lt;name&gt;1. IDList Length Mod batchSize = 0&lt;/name&gt;
    &lt;/rules&gt;
    &lt;rules&gt;
        &lt;actions&gt;
            &lt;attribute&gt;specialPage&lt;/attribute&gt;
            &lt;id&gt;4f763b71-bb46-472c-8364-86e6fbd293fe&lt;/id&gt;
            &lt;value&gt;${IDList:jsonPath('$.length()'):divide(${batchSize})}&lt;/value&gt;
        &lt;/actions&gt;
        &lt;actions&gt;
            &lt;attribute&gt;totalPage&lt;/attribute&gt;
            &lt;id&gt;0c63c6dd-1748-4e21-8e35-e8645341dd48&lt;/id&gt;
            &lt;value&gt;${IDList:jsonPath('$.length()'):divide(${batchSize}):plus(1)}&lt;/value&gt;
        &lt;/actions&gt;
        &lt;conditions&gt;
            &lt;expression&gt;${IDList:jsonPath('$.length()'):mod(${batchSize}):equals('0'):not()}&lt;/expression&gt;
            &lt;id&gt;20c60e9a-db68-44b1-a22b-e3f83a57dd8a&lt;/id&gt;
        &lt;/conditions&gt;
        &lt;id&gt;403feae6-e423-4b8b-b15a-e8a93dac5fd7&lt;/id&gt;
        &lt;name&gt;2. 1. IDList Length Mod batchSize != 0&lt;/name&gt;
    &lt;/rules&gt;
&lt;/criteria&gt;</annotationData>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
    <entry>
        <key>Delete Attributes Expression</key>
        <value>
            <name>Delete Attributes Expression</name>
        </value>
    </entry>
    <entry>
        <key>Store State</key>
        <value>
            <name>Store State</name>
        </value>
    </entry>
    <entry>
        <key>Stateful Variables Initial Value</key>
        <value>
            <name>Stateful Variables Initial Value</name>
        </value>
    </entry>
    <entry>
        <key>currentPage</key>
        <value>
            <name>currentPage</name>
        </value>
    </entry>
    <entry>
        <key>IDListLength</key>
        <value>
            <name>IDListLength</name>
        </value>
    </entry>
    <entry>
        <key>pageSize</key>
        <value>
            <name>pageSize</name>
        </value>
    </entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
    <entry>
        <key>Delete Attributes Expression</key>
    </entry>
    <entry>
        <key>Store State</key>
        <value>Do not store state</value>
    </entry>
    <entry>
        <key>Stateful Variables Initial Value</key>
    </entry>
    <entry>
        <key>currentPage</key>
        <value>1</value>
    </entry>
    <entry>
        <key>IDListLength</key>
        <value>${IDList:jsonPath('$.length()')}</value>
    </entry>
    <entry>
        <key>pageSize</key>
        <value>${batchSize}</value>
    </entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
                            </config>
                            <executionNodeRestricted>false</executionNodeRestricted>
                            <name>InitializeTheBatchLoop</name>
                            <relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
                            </relationships>
                            <state>RUNNING</state>
                            <style/>
                            <type>org.apache.nifi.processors.attributes.UpdateAttribute</type>
                        </processors>
                    </contents>
                    <name>01 PrepareBatchedSQLStr</name>
                    <variables/>
                </processGroups>
                <processors>
                    <id>30fdeb4d-c403-3862-0000-000000000000</id>
                    <parentGroupId>eff4ec5e-a8ab-3310-0000-000000000000</parentGroupId>
                    <position>
                        <x>351.6160941528465</x>
                        <y>863.3689543715004</y>
                    </position>
                    <bundle>
                        <artifact>nifi-standard-nar</artifact>
                        <group>org.apache.nifi</group>
                        <version>1.7.1-2.0_09000</version>
                    </bundle>
                    <config>
                        <bulletinLevel>WARN</bulletinLevel>
                        <comments></comments>
                        <concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
                        <descriptors>
                            <entry>
<key>File Size</key>
<value>
    <name>File Size</name>
</value>
                            </entry>
                            <entry>
<key>Batch Size</key>
<value>
    <name>Batch Size</name>
</value>
                            </entry>
                            <entry>
<key>Data Format</key>
<value>
    <name>Data Format</name>
</value>
                            </entry>
                            <entry>
<key>Unique FlowFiles</key>
<value>
    <name>Unique FlowFiles</name>
</value>
                            </entry>
                            <entry>
<key>generate-ff-custom-text</key>
<value>
    <name>generate-ff-custom-text</name>
</value>
                            </entry>
                            <entry>
<key>character-set</key>
<value>
    <name>character-set</name>
</value>
                            </entry>
                            <entry>
<key>mime.type</key>
<value>
    <name>mime.type</name>
</value>
                            </entry>
                            <entry>
<key>test</key>
<value>
    <name>test</name>
</value>
                            </entry>
                        </descriptors>
                        <executionNode>ALL</executionNode>
                        <lossTolerant>false</lossTolerant>
                        <penaltyDuration>30 sec</penaltyDuration>
                        <properties>
                            <entry>
<key>File Size</key>
<value>0B</value>
                            </entry>
                            <entry>
<key>Batch Size</key>
<value>1</value>
                            </entry>
                            <entry>
<key>Data Format</key>
<value>Text</value>
                            </entry>
                            <entry>
<key>Unique FlowFiles</key>
<value>false</value>
                            </entry>
                            <entry>
<key>generate-ff-custom-text</key>
<value>[{
"ID":123,
"params":"xxx"
},{
"ID":124,
"params":"xxx"
},{
"ID":125,
"params":"xxx"
},{
"ID":126,
"params":"xxx"
},{
"ID":127,
"params":"xxx"
}]</value>
                            </entry>
                            <entry>
<key>character-set</key>
<value>UTF-8</value>
                            </entry>
                            <entry>
<key>mime.type</key>
<value>application/json;charset=utf-8</value>
                            </entry>
                            <entry>
<key>test</key>
<value>${literal('ab')
:toUpper()}</value>
                            </entry>
                        </properties>
                        <runDurationMillis>0</runDurationMillis>
                        <schedulingPeriod>1 d</schedulingPeriod>
                        <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
                        <yieldDuration>1 sec</yieldDuration>
                    </config>
                    <executionNodeRestricted>false</executionNodeRestricted>
                    <name>GenerateFlowFile</name>
                    <relationships>
                        <autoTerminate>false</autoTerminate>
                        <name>success</name>
                    </relationships>
                    <state>RUNNING</state>
                    <style/>
                    <type>org.apache.nifi.processors.standard.GenerateFlowFile</type>
                </processors>
                <processors>
                    <id>e333c1a6-2f02-341e-0000-000000000000</id>
                    <parentGroupId>eff4ec5e-a8ab-3310-0000-000000000000</parentGroupId>
                    <position>
                        <x>824.6160941528465</x>
                        <y>1161.3689543715004</y>
                    </position>
                    <bundle>
                        <artifact>nifi-standard-nar</artifact>
                        <group>org.apache.nifi</group>
                        <version>1.7.1-2.0_09000</version>
                    </bundle>
                    <config>
                        <bulletinLevel>WARN</bulletinLevel>
                        <comments></comments>
                        <concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
                        <descriptors>
                            <entry>
<key>log-level</key>
<value>
    <name>log-level</name>
</value>
                            </entry>
                            <entry>
<key>log-prefix</key>
<value>
    <name>log-prefix</name>
</value>
                            </entry>
                            <entry>
<key>log-message</key>
<value>
    <name>log-message</name>
</value>
                            </entry>
                        </descriptors>
                        <executionNode>ALL</executionNode>
                        <lossTolerant>false</lossTolerant>
                        <penaltyDuration>30 sec</penaltyDuration>
                        <properties>
                            <entry>
<key>log-level</key>
<value>info</value>
                            </entry>
                            <entry>
<key>log-prefix</key>
<value>### RESULT ###</value>
                            </entry>
                            <entry>
<key>log-message</key>
<value>${status:replaceEmpty('Success')}${msg}${msgCode}</value>
                            </entry>
                        </properties>
                        <runDurationMillis>0</runDurationMillis>
                        <schedulingPeriod>0 sec</schedulingPeriod>
                        <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
                        <yieldDuration>1 sec</yieldDuration>
                    </config>
                    <executionNodeRestricted>false</executionNodeRestricted>
                    <name>LogMessage</name>
                    <relationships>
                        <autoTerminate>true</autoTerminate>
                        <name>success</name>
                    </relationships>
                    <state>RUNNING</state>
                    <style/>
                    <type>org.apache.nifi.processors.standard.LogMessage</type>
                </processors>
            </contents>
            <name>BatchPreSQL</name>
            <variables>
                <entry>
                    <key>batchSize</key>
                    <value>4</value>
                </entry>
            </variables>
        </processGroups>
    </snippet>
    <timestamp>01/15/2022 02:19:55 CST</timestamp>
</template>

 

avatar
Explorer

For others that stumble across this.. I ended up delving into scripting processors and implemented a script that does the batching:

 

var IOUtils = Java.type("org.apache.commons.io.IOUtils")
var StandardCharsets = Java.type("java.nio.charset.StandardCharsets")

// Grab 50 flow files from the input queue (or whatever's available)
flowFileList = session.get(50)
if(!flowFileList.isEmpty()) {
	
	var ids = [];
	for each (var flowFile in flowFileList) { 
		var InputStreamCallback =  Java.type("org.apache.nifi.processor.io.InputStreamCallback")

		// Create a new InputStreamCallback, passing in a function to define the interface method
		session.read(flowFile, new InputStreamCallback(function(inputStream) {
			// Get the JSON out of the flowfile and conver to a JS object
			var text = IOUtils.toString(inputStream, StandardCharsets.UTF_8);
			var obj = JSON.parse(text);
			if (obj.hasOwnProperty('SourceDataElementValue')) {
				ids.push(obj.SourceDataElementValue);
			}
			// Do something with text here
		}));
		
		// Eat the flowfile after the TML ID is extracted.
		session.remove(flowFile);
	}
	
	if (ids.length > 0) {
		attributeValue = ids.join();
		outputFlowFile = session.create();
		outputFlowFile = session.putAttribute(outputFlowFile, 'tml_list', attributeValue);
		session.transfer(outputFlowFile, REL_SUCCESS)
	}
}