Created 01-11-2022 08:50 AM
I have a flow that extracts an ID from a flow file, then does an ExectueSQL to fetch some additional data. Since this is one at a time, it's very inefficient (~300 records per minute - which doesn't sound bad until it's processing a million records).
Is there a way to run flow files through a process and extract the required value in batches of 50 or 100 so that it returns a comma-separated value (like 123, 124, 125, 126, ... etc.) to pass to the ExecuteSQL so it's returning data in larger batches instead of one at a time?
Currently, it's doing an EvaluateJsonPath, pulling that one value to a property so the ExecuteSQL processor can construct the SQL like "SELECT * FROM TABLE_NAME WHERE KEY = ${key}". I'd like to modify that so it's "...WHERE KEY IN (${keyList})" and keyList is the comma-separated list created above.
Not sure how to approach this, however.
Created 01-12-2022 12:15 PM
I was looking at the scripting capabilities... is something like the outline below possible (pseudocode, obviously)
while (file = get new flow file from intput) {
data=...extract field ...
append data to string
if (extracted more than 50 things) {
emit new flowfile with string as attribute
}
}
I've seen plenty of examples of what I'd consider a "transform single flowfile into something else" but not where a script can fetch another flowfile from the session with a "flowfile = session.get();" but it's unclear if you can do that multiple times in the same script.
Created on 01-14-2022 11:50 AM - edited 01-14-2022 11:55 AM
Perhaps we could first read the flow file content in json format, and then parsing it as json object.
(
e.g. if using groovy script, we might use JsonSlurper to deal with such flow file content of valid JSON format.
)
Then the rest of the thing could be dealing with the iteration loop for its ID array, and following the same logic to distinguish the different pages.
Created 01-14-2022 11:34 AM
Here is a customized way to implement such algorithm without writing any Groovy/Java/Python/etc. script.
1. Manually design a iteration loop to prepare the paged/batched sql str.
2. Inside such loop, we can focus on manipulating the pageNumber(loop index) to prepare each sql statement accordingly.
For instance, we can use JOLT specification to group those ID into an IDList array via JoltTransformJSON processor, and is result would be like [123,124,125,...].
After that, we can design an iteration loop, to get predefined range of numbers from aforementioned IDList based on the batchSize/pageSize.
For the 1st-Loop we can get 1st page filter scope, which covers the ID in
('IDList[0]', ..., 'IDList[pageSize*1-1]')
For the 2nd-Loop we can 2nd page filter scope, which covers the ID in
('IDList[pageSize*(2-1)]', ...., 'IDList[pageSize*2-1]')
As for the middle page filter scope, it would be
('IDList[pageSize*(pageNum-1)]',....'IDList[pageSize*pageNum-1]')
For the last page, it then comes to be
('IDList[pageSize*(pageNum-1)]',....'IDList[IDListLength-1]')
In such way, we now get batched sql statement's in-conditional part to prepare the where clause.
You may refer the following link to use its attached xml template file for more details.
Hope this may help.
Created 01-14-2022 11:37 AM
Attached the xml template file for your reference:
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<template encoding-version="1.2">
<description>BatchPreSQL
Author: Oliver Gong
DATE: 2022-01-15
</description>
<groupId>0a50e507-017e-1000-2e5c-c691a8dbb45b</groupId>
<name>BatchPreSQL</name>
<snippet>
<processGroups>
<id>eff4ec5e-a8ab-3310-0000-000000000000</id>
<parentGroupId>5511af2f-6c32-35cc-0000-000000000000</parentGroupId>
<position>
<x>0.0</x>
<y>0.0</y>
</position>
<additions/>
<comments>BatchPreSQL
Author: Oliver Gong
DATE: 2022-01-14
</comments>
<contents>
<connections>
<id>15c43a7e-4e9a-39c0-0000-000000000000</id>
<parentGroupId>eff4ec5e-a8ab-3310-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>8901cdca-8fdd-3d01-0000-000000000000</id>
<type>INPUT_PORT</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>eff4ec5e-a8ab-3310-0000-000000000000</groupId>
<id>30fdeb4d-c403-3862-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>97ca5515-e845-3e39-0000-000000000000</id>
<parentGroupId>eff4ec5e-a8ab-3310-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<bends>
<x>643.1160941528465</x>
<y>1122.8689543715004</y>
</bends>
<destination>
<groupId>eff4ec5e-a8ab-3310-0000-000000000000</groupId>
<id>e333c1a6-2f02-341e-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<source>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>6ff09bae-c4a1-38c5-0000-000000000000</id>
<type>OUTPUT_PORT</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>9874061c-5dd6-3fd9-0000-000000000000</id>
<parentGroupId>eff4ec5e-a8ab-3310-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>eff4ec5e-a8ab-3310-0000-000000000000</groupId>
<id>74dbf5d9-50b9-3055-0000-000000000000</id>
<type>FUNNEL</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<source>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>b8c0272c-e161-3456-0000-000000000000</id>
<type>OUTPUT_PORT</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>e271b74a-2af3-3fcf-0000-000000000000</id>
<parentGroupId>eff4ec5e-a8ab-3310-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>eff4ec5e-a8ab-3310-0000-000000000000</groupId>
<id>e333c1a6-2f02-341e-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<source>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>3e3e7f3b-3d3a-3b8d-0000-000000000000</id>
<type>OUTPUT_PORT</type>
</source>
<zIndex>0</zIndex>
</connections>
<funnels>
<id>74dbf5d9-50b9-3055-0000-000000000000</id>
<parentGroupId>eff4ec5e-a8ab-3310-0000-000000000000</parentGroupId>
<position>
<x>386.6160941528465</x>
<y>1410.3689543715004</y>
</position>
</funnels>
<labels>
<id>087856d3-c058-355e-0000-000000000000</id>
<parentGroupId>eff4ec5e-a8ab-3310-0000-000000000000</parentGroupId>
<position>
<x>94.61609415284647</x>
<y>782.3689543715004</y>
</position>
<height>255.0</height>
<label>1. Generate a request body to mock the client’s json request as following:
[{
"ID":123,
"params":"xxx"
},{
"ID":124,
"params":"xxx"
},{
"ID":125,
"params":"xxx"
},{
"ID":126,
"params":"xxx"
},{
"ID":127,
"params":"xxx"
}]</label>
<style>
<entry>
<key>font-size</key>
<value>12px</value>
</entry>
</style>
<width>528.0</width>
</labels>
<labels>
<id>4a9b240d-8ba7-3852-0000-000000000000</id>
<parentGroupId>eff4ec5e-a8ab-3310-0000-000000000000</parentGroupId>
<position>
<x>-30.383905847153528</x>
<y>1472.3689543715004</y>
</position>
<height>439.0</height>
<label>Description on each condition scope of the spawn SQLs
==============================================
The index of data from IDList arrays is listed like
1stPage: [FROM 0 TO pageSize-1]
2ndPage: [FROM pageSize TO 2*pageSize-1]
...
MiddlePage: [FROM (currentPage-1)*pageSize TO pageNumber*pageSize-1]
...
LastPage: [FROM (currentPage-1)*pageSiz TO IDListLength-1 ]
As per the JSONPath grammar, to slice the array into a sub array,
the specification we need to know is that :
when using [I_START:I_END] JSONPath to slice the array,
-> the data we gonna get is FROM index I_START TO I_END -1
which means the index we specified after ":" would not be included to the slicing result. </label>
<style>
<entry>
<key>font-size</key>
<value>24px</value>
</entry>
</style>
<width>1040.0</width>
</labels>
<labels>
<id>9e75a5d0-b823-3548-0000-000000000000</id>
<parentGroupId>eff4ec5e-a8ab-3310-0000-000000000000</parentGroupId>
<position>
<x>-414.3839058471535</x>
<y>-6.6310914048667655</y>
</position>
<height>732.0</height>
<label>This module(Processor Group) is going to demonstrate the way on
how to prepare a batched SQLs when trying to look up a range of data in DB.
Once you finish importing this module, here is the guide on
+++++++++++++++++++++++
HOW TO USE THIS MODULE:
1. Start the whole module's processors, check the content/attribute of the flowfile coming from the sub module(01 PrepareBatchedSQLStr)
2. Check the batched sqlStr.
+++++++++++++++++++++++
You can try below stuff, if you'd like to do some further validation on this algorithm:
1. CHANGE the module variable named batchSize.
With different batchSize settings, you would see the sqlStr being prepared accordingly.
(
A. Right click on empty area on the canvas of current Module--BatchPreSQL,
B. Then select and click "Variables" button to jump into Variables setting page.
C. Set variable-batchSize with other int values(postive int number exclude 0) to configure the pageSize of current algorithm.
1. batchSize > IDListLength
2. batchSize = IDListLength
3. batchSize < IDListLength (postive int number exclude 0 or you may use abs EL function to support the negative values)
D. Run the whole biz flow to verify such result.
)
2. You may also apply with larger raw json array to run the biz flow.
Please feel free to let me know if you have any question.</label>
<style>
<entry>
<key>background-color</key>
<value>#53b3d9</value>
</entry>
<entry>
<key>font-size</key>
<value>24px</value>
</entry>
</style>
<width>1514.0</width>
</labels>
<processGroups>
<id>56e121fa-15d6-3fd4-0000-000000000000</id>
<parentGroupId>eff4ec5e-a8ab-3310-0000-000000000000</parentGroupId>
<position>
<x>217.61609415284647</x>
<y>1104.3689543715004</y>
</position>
<additions/>
<comments></comments>
<contents>
<connections>
<id>07a8f12b-7736-3a6c-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>9ef7cadf-476e-3437-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>failure</selectedRelationships>
<selectedRelationships>unmatched</selectedRelationships>
<source>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>4ee2df31-2f9b-34d5-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>13b22f01-b24d-359f-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>60fb5121-b8bc-394a-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>19e51c58-e665-33c7-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>1736190e-f132-3dde-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>2428e910-0762-38e2-0000-000000000000</id>
<type>FUNNEL</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name>Finish Batch Loop</name>
<selectedRelationships>unmatched</selectedRelationships>
<source>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>97407168-ea79-32f1-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>37a04d0a-9f84-3813-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>ed04498f-82f7-37f7-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>1. Inside the Batch Loop</selectedRelationships>
<source>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>97407168-ea79-32f1-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>4a4cc1bc-c93c-37f3-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>b8c0272c-e161-3456-0000-000000000000</id>
<type>OUTPUT_PORT</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>failure</selectedRelationships>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>60fb5121-b8bc-394a-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>4eef65de-4c95-3ca3-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>6ff09bae-c4a1-38c5-0000-000000000000</id>
<type>OUTPUT_PORT</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>6837ac99-5ce2-3c47-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>5cd2322c-6cf8-3117-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>97407168-ea79-32f1-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>f784d98d-4eaf-395c-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>660fbef2-894b-3ba6-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>e4d04817-e7d4-34a9-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>ed04498f-82f7-37f7-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>6a063a4c-e0e0-3f3f-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>3e3e7f3b-3d3a-3b8d-0000-000000000000</id>
<type>OUTPUT_PORT</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<source>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>2428e910-0762-38e2-0000-000000000000</id>
<type>FUNNEL</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>73d4d5b3-ed4d-33ef-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>f784d98d-4eaf-395c-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>matched</selectedRelationships>
<source>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>4ee2df31-2f9b-34d5-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>84fbfa68-f4f4-3ff8-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>a37238fd-6b1a-334b-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<source>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>8901cdca-8fdd-3d01-0000-000000000000</id>
<type>INPUT_PORT</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>955217f3-8048-314d-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>4ee2df31-2f9b-34d5-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>a37238fd-6b1a-334b-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>b447a208-a363-3f24-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>19e51c58-e665-33c7-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>ed04498f-82f7-37f7-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>b5babaa5-ec72-3496-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>6837ac99-5ce2-3c47-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>failure</selectedRelationships>
<source>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>a37238fd-6b1a-334b-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>b97777ac-b7ee-30e2-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>97407168-ea79-32f1-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>e4d04817-e7d4-34a9-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>fd991255-d565-30f1-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>6ff09bae-c4a1-38c5-0000-000000000000</id>
<type>OUTPUT_PORT</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>9ef7cadf-476e-3437-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<funnels>
<id>2428e910-0762-38e2-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<position>
<x>1177.000005285659</x>
<y>694.9999968519692</y>
</position>
</funnels>
<inputPorts>
<id>8901cdca-8fdd-3d01-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<position>
<x>486.000005285659</x>
<y>46.99999685196917</y>
</position>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<name>IN</name>
<state>RUNNING</state>
<type>INPUT_PORT</type>
</inputPorts>
<labels>
<id>29a66daf-8ba8-387c-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<position>
<x>1070.6160941528465</x>
<y>945.3690154066567</y>
</position>
<height>124.0</height>
<label>JSONPath
[start:end]
Selects array elements from the start index and up to,
but not including, end index.
[start:]
If end is omitted,
selects all elements from start until the end of the array. Returns a list.</label>
<style>
<entry>
<key>font-size</key>
<value>12px</value>
</entry>
</style>
<width>393.0</width>
</labels>
<labels>
<id>69a045b5-f89a-3bdd-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<position>
<x>707.000005285659</x>
<y>243.99999685196917</y>
</position>
<height>47.0</height>
<label>Here we are going to extract the IDList which contains the whole ID(s) from the raw data.</label>
<style>
<entry>
<key>font-size</key>
<value>18px</value>
</entry>
</style>
<width>732.0</width>
</labels>
<outputPorts>
<id>3e3e7f3b-3d3a-3b8d-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<position>
<x>1418.7386527465965</x>
<y>683.4886290541176</y>
</position>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<name>Finish Preparing Batch SQL strs</name>
<state>RUNNING</state>
<type>OUTPUT_PORT</type>
</outputPorts>
<outputPorts>
<id>6ff09bae-c4a1-38c5-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<position>
<x>-250.38390584715353</x>
<y>340.3689543715004</y>
</position>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<name>Exceptions</name>
<state>RUNNING</state>
<type>OUTPUT_PORT</type>
</outputPorts>
<outputPorts>
<id>b8c0272c-e161-3456-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<position>
<x>835.7386527465965</x>
<y>1555.4886290541176</y>
</position>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<name>Go2RemainBiz</name>
<state>RUNNING</state>
<type>OUTPUT_PORT</type>
</outputPorts>
<processors>
<id>19e51c58-e665-33c7-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<position>
<x>883.6160941528465</x>
<y>1151.3689543715004</y>
</position>
<bundle>
<artifact>nifi-update-attribute-nar</artifact>
<group>org.apache.nifi</group>
<version>1.7.1-2.0_09000</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments>If there only 1 element in the fetching scope, we would surround it with ('<LastPage_OnlyOneExpectedValue2Fetch>')</comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Delete Attributes Expression</key>
<value>
<name>Delete Attributes Expression</name>
</value>
</entry>
<entry>
<key>Store State</key>
<value>
<name>Store State</name>
</value>
</entry>
<entry>
<key>Stateful Variables Initial Value</key>
<value>
<name>Stateful Variables Initial Value</name>
</value>
</entry>
<entry>
<key>sqlStr</key>
<value>
<name>sqlStr</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Delete Attributes Expression</key>
</entry>
<entry>
<key>Store State</key>
<value>Do not store state</value>
</entry>
<entry>
<key>Stateful Variables Initial Value</key>
</entry>
<entry>
<key>sqlStr</key>
<value>SELECT
*
FROM
<TABLE_NAME>
WHERE
ID IN ${sqlStrBatchCondition:contains(','):not()
:ifElse(
"('${sqlStrBatchCondition}')",
${sqlStrBatchCondition}
)
}</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<executionNodeRestricted>false</executionNodeRestricted>
<name>prepareSQLStrs</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<state>RUNNING</state>
<style/>
<type>org.apache.nifi.processors.attributes.UpdateAttribute</type>
</processors>
<processors>
<id>4ee2df31-2f9b-34d5-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<position>
<x>564.7386527465965</x>
<y>439.4885985365395</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.7.1-2.0_09000</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Destination</key>
<value>
<name>Destination</name>
</value>
</entry>
<entry>
<key>Return Type</key>
<value>
<name>Return Type</name>
</value>
</entry>
<entry>
<key>Path Not Found Behavior</key>
<value>
<name>Path Not Found Behavior</name>
</value>
</entry>
<entry>
<key>Null Value Representation</key>
<value>
<name>Null Value Representation</name>
</value>
</entry>
<entry>
<key>IDList</key>
<value>
<name>IDList</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Destination</key>
<value>flowfile-attribute</value>
</entry>
<entry>
<key>Return Type</key>
<value>json</value>
</entry>
<entry>
<key>Path Not Found Behavior</key>
<value>ignore</value>
</entry>
<entry>
<key>Null Value Representation</key>
<value>empty string</value>
</entry>
<entry>
<key>IDList</key>
<value>$.IDList</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<executionNodeRestricted>false</executionNodeRestricted>
<name>EvaluateJsonPath</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>matched</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>unmatched</name>
</relationships>
<state>RUNNING</state>
<style/>
<type>org.apache.nifi.processors.standard.EvaluateJsonPath</type>
</processors>
<processors>
<id>60fb5121-b8bc-394a-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<position>
<x>893.7386527465965</x>
<y>1365.4886290541176</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.7.1-2.0_11005</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Regular Expression</key>
<value>
<name>Regular Expression</name>
</value>
</entry>
<entry>
<key>Replacement Value</key>
<value>
<name>Replacement Value</name>
</value>
</entry>
<entry>
<key>Character Set</key>
<value>
<name>Character Set</name>
</value>
</entry>
<entry>
<key>Maximum Buffer Size</key>
<value>
<name>Maximum Buffer Size</name>
</value>
</entry>
<entry>
<key>Replacement Strategy</key>
<value>
<name>Replacement Strategy</name>
</value>
</entry>
<entry>
<key>Evaluation Mode</key>
<value>
<name>Evaluation Mode</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Regular Expression</key>
<value>(?s)(^.*$)</value>
</entry>
<entry>
<key>Replacement Value</key>
<value>==========
TotalPage: ${totalPage}
CurrentPage: ${currentPage}
PageSize:${batchSize}
==========
SQL is:
${sqlStr}</value>
</entry>
<entry>
<key>Character Set</key>
<value>UTF-8</value>
</entry>
<entry>
<key>Maximum Buffer Size</key>
<value>1 MB</value>
</entry>
<entry>
<key>Replacement Strategy</key>
<value>Always Replace</value>
</entry>
<entry>
<key>Evaluation Mode</key>
<value>Entire text</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<executionNodeRestricted>false</executionNodeRestricted>
<name>PrintResult</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<state>RUNNING</state>
<style/>
<type>org.apache.nifi.processors.standard.ReplaceText</type>
</processors>
<processors>
<id>6837ac99-5ce2-3c47-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<position>
<x>245.00000528565897</x>
<y>227.99999685196917</y>
</position>
<bundle>
<artifact>nifi-update-attribute-nar</artifact>
<group>org.apache.nifi</group>
<version>1.7.1-2.0_09000</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Delete Attributes Expression</key>
<value>
<name>Delete Attributes Expression</name>
</value>
</entry>
<entry>
<key>Store State</key>
<value>
<name>Store State</name>
</value>
</entry>
<entry>
<key>Stateful Variables Initial Value</key>
<value>
<name>Stateful Variables Initial Value</name>
</value>
</entry>
<entry>
<key>msg</key>
<value>
<name>msg</name>
</value>
</entry>
<entry>
<key>msgCode</key>
<value>
<name>msgCode</name>
</value>
</entry>
<entry>
<key>status</key>
<value>
<name>status</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Delete Attributes Expression</key>
</entry>
<entry>
<key>Store State</key>
<value>Do not store state</value>
</entry>
<entry>
<key>Stateful Variables Initial Value</key>
</entry>
<entry>
<key>msg</key>
<value>Failed to extract IDList from raw data via JOLT.</value>
</entry>
<entry>
<key>msgCode</key>
<value>01:01</value>
</entry>
<entry>
<key>status</key>
<value>failed</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<executionNodeRestricted>false</executionNodeRestricted>
<name>01 PrepareExceptionInfo</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<state>RUNNING</state>
<style/>
<type>org.apache.nifi.processors.attributes.UpdateAttribute</type>
</processors>
<processors>
<id>97407168-ea79-32f1-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<position>
<x>868.7386527465965</x>
<y>670.4886290541176</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.7.1-2.0_09000</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Routing Strategy</key>
<value>
<name>Routing Strategy</name>
</value>
</entry>
<entry>
<key>1. Inside the Batch Loop</key>
<value>
<name>1. Inside the Batch Loop</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Routing Strategy</key>
<value>Route to Property name</value>
</entry>
<entry>
<key>1. Inside the Batch Loop</key>
<value>${currentPage:le(${totalPage})}</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<executionNodeRestricted>false</executionNodeRestricted>
<name>RouteOnAttribute</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>1. Inside the Batch Loop</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>unmatched</name>
</relationships>
<state>RUNNING</state>
<style/>
<type>org.apache.nifi.processors.standard.RouteOnAttribute</type>
</processors>
<processors>
<id>9ef7cadf-476e-3437-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<position>
<x>247.00000528565897</x>
<y>439.9999968519692</y>
</position>
<bundle>
<artifact>nifi-update-attribute-nar</artifact>
<group>org.apache.nifi</group>
<version>1.7.1-2.0_09000</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Delete Attributes Expression</key>
<value>
<name>Delete Attributes Expression</name>
</value>
</entry>
<entry>
<key>Store State</key>
<value>
<name>Store State</name>
</value>
</entry>
<entry>
<key>Stateful Variables Initial Value</key>
<value>
<name>Stateful Variables Initial Value</name>
</value>
</entry>
<entry>
<key>msg</key>
<value>
<name>msg</name>
</value>
</entry>
<entry>
<key>msgCode</key>
<value>
<name>msgCode</name>
</value>
</entry>
<entry>
<key>status</key>
<value>
<name>status</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Delete Attributes Expression</key>
</entry>
<entry>
<key>Store State</key>
<value>Do not store state</value>
</entry>
<entry>
<key>Stateful Variables Initial Value</key>
</entry>
<entry>
<key>msg</key>
<value>Failed to extract IDList from flowfile content to attribute.</value>
</entry>
<entry>
<key>msgCode</key>
<value>01:02</value>
</entry>
<entry>
<key>status</key>
<value>failed</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<executionNodeRestricted>false</executionNodeRestricted>
<name>02 PrepareExceptionInfo</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<state>RUNNING</state>
<style/>
<type>org.apache.nifi.processors.attributes.UpdateAttribute</type>
</processors>
<processors>
<id>a37238fd-6b1a-334b-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<position>
<x>558.6317191528465</x>
<y>227.36889333634417</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.7.1-2.0_11005</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments>Here we are going to extract the IDList which contains the whole ID(s) from the raw data.</comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>jolt-transform</key>
<value>
<name>jolt-transform</name>
</value>
</entry>
<entry>
<key>jolt-custom-class</key>
<value>
<name>jolt-custom-class</name>
</value>
</entry>
<entry>
<key>jolt-custom-modules</key>
<value>
<name>jolt-custom-modules</name>
</value>
</entry>
<entry>
<key>jolt-spec</key>
<value>
<name>jolt-spec</name>
</value>
</entry>
<entry>
<key>Transform Cache Size</key>
<value>
<name>Transform Cache Size</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>jolt-transform</key>
<value>jolt-transform-chain</value>
</entry>
<entry>
<key>jolt-custom-class</key>
</entry>
<entry>
<key>jolt-custom-modules</key>
</entry>
<entry>
<key>jolt-spec</key>
<value>[{
"operation": "shift",
"spec": {
"*":{
"ID":"IDList"
}
}
}
]</value>
</entry>
<entry>
<key>Transform Cache Size</key>
<value>1</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<executionNodeRestricted>false</executionNodeRestricted>
<name>JoltTransformJSON</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<state>RUNNING</state>
<style/>
<type>org.apache.nifi.processors.standard.JoltTransformJSON</type>
</processors>
<processors>
<id>e4d04817-e7d4-34a9-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<position>
<x>577.7386527465965</x>
<y>946.4886290541176</y>
</position>
<bundle>
<artifact>nifi-update-attribute-nar</artifact>
<group>org.apache.nifi</group>
<version>1.7.1-2.0_09000</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Delete Attributes Expression</key>
<value>
<name>Delete Attributes Expression</name>
</value>
</entry>
<entry>
<key>Store State</key>
<value>
<name>Store State</name>
</value>
</entry>
<entry>
<key>Stateful Variables Initial Value</key>
<value>
<name>Stateful Variables Initial Value</name>
</value>
</entry>
<entry>
<key>currentPage</key>
<value>
<name>currentPage</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Delete Attributes Expression</key>
</entry>
<entry>
<key>Store State</key>
<value>Do not store state</value>
</entry>
<entry>
<key>Stateful Variables Initial Value</key>
</entry>
<entry>
<key>currentPage</key>
<value>${currentPage:plus(1)}</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<executionNodeRestricted>false</executionNodeRestricted>
<name>ScrollcurrentPage</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<state>RUNNING</state>
<style/>
<type>org.apache.nifi.processors.attributes.UpdateAttribute</type>
</processors>
<processors>
<id>ed04498f-82f7-37f7-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<position>
<x>874.7386527465965</x>
<y>947.4886290541176</y>
</position>
<bundle>
<artifact>nifi-update-attribute-nar</artifact>
<group>org.apache.nifi</group>
<version>1.7.1-2.0_09000</version>
</bundle>
<config>
<annotationData><criteria>
<flowFilePolicy>USE_ORIGINAL</flowFilePolicy>
<rules>
<actions>
<attribute>sqlStrBatchCondition</attribute>
<id>570f5b7b-f835-4c8d-8f5d-e08deda1c7c1</id>
<value>${IDList:jsonPath("$[0:${IDListLength}]")
:replaceAll(',',"','"):replaceAll('\[',"('"):replaceAll('\]',"')")
}
</value>
</actions>
<conditions>
<expression>${currentPage:equals(1)}</expression>
<id>97d055e9-12d2-439c-93a4-8642f0bebd6d</id>
</conditions>
<conditions>
<expression>${IDListLength:le(${pageSize})}</expression>
<id>fa0053b2-a183-4f33-9788-3fe68ccae5f1</id>
</conditions>
<id>c0bb8d5c-cb73-40f5-8323-632122cb75ac</id>
<name>1.1 First Page &amp;&amp; IDListLength &lt;= pageSize </name>
</rules>
<rules>
<actions>
<attribute>sqlStrBatchCondition</attribute>
<id>570f5b7b-f835-4c8d-8f5d-e08deda1c7c1</id>
<value>${IDList:jsonPath("$[0:${pageSize}]")
:replaceAll(',',"','"):replaceAll('\[',"('"):replaceAll('\]',"')")
}</value>
</actions>
<conditions>
<expression>${currentPage:equals(1)}</expression>
<id>97d055e9-12d2-439c-93a4-8642f0bebd6d</id>
</conditions>
<conditions>
<expression>${IDListLength:le(${pageSize}):not()}</expression>
<id>fa0053b2-a183-4f33-9788-3fe68ccae5f1</id>
</conditions>
<id>b351555e-2dd8-4303-bd05-6aff43731a38</id>
<name>1.2 First Page &amp;&amp; IDListLength &gt; pageSize </name>
</rules>
<rules>
<actions>
<attribute>sqlStrBatchCondition</attribute>
<id>570f5b7b-f835-4c8d-8f5d-e08deda1c7c1</id>
<value>${IDList:jsonPath("$[${currentPage:minus(1):multiply(${pageSize})}:${currentPage:minus(1):multiply(${pageSize}):plus(${pageSize})}]")
:replaceAll(',',"','"):replaceAll('\[',"('"):replaceAll('\]',"')")
}</value>
</actions>
<conditions>
<expression>${currentPage:lt(${totalPage})}</expression>
<id>46a93c40-dac1-4502-8dac-ceb2b12a2756</id>
</conditions>
<conditions>
<expression>${currentPage:gt(1)}</expression>
<id>97d055e9-12d2-439c-93a4-8642f0bebd6d</id>
</conditions>
<id>e19f2512-98be-47fa-8e0b-76339348f853</id>
<name>2. MiddlePage</name>
</rules>
<rules>
<actions>
<attribute>sqlStrBatchCondition</attribute>
<id>570f5b7b-f835-4c8d-8f5d-e08deda1c7c1</id>
<value>${IDList:jsonPath("$[${currentPage:minus(1):multiply(${pageSize})}:${IDListLength}]")
:replaceAll(',',"','"):replaceAll('\[',"('"):replaceAll('\]',"')")
}</value>
</actions>
<conditions>
<expression>${currentPage:equals(${totalPage})}</expression>
<id>97d055e9-12d2-439c-93a4-8642f0bebd6d</id>
</conditions>
<conditions>
<expression>${totalPage:equals(1):not()}</expression>
<id>46a93c40-dac1-4502-8dac-ceb2b12a2756</id>
</conditions>
<id>0d277b81-3121-4d87-844d-4dcc1325fb2c</id>
<name>3. LastPage &amp;&amp; totalPage !=1</name>
</rules>
</criteria></annotationData>
<bulletinLevel>WARN</bulletinLevel>
<comments>SELECT
*
FROM
<TABLE_NAME>
WHERE
ID IN ${index:equals(${specificFlag:replaceEmpty('')})
:ifElse(
${IDList:jsonPath("$[${index:multiply(${batchSize})}:${IDListLength:minus(1)}]")
:replaceAll(',',"','"):replaceAll('\[',"('"):replaceAll('\]',"')")
},
${IDList:jsonPath("$[${index:multiply(${batchSize})}:${index:multiply(${batchSize}):plus(${batchSize})}]")
:replaceAll(',',"','"):replaceAll('\[',"('"):replaceAll('\]',"')")
}
)
}</comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Delete Attributes Expression</key>
<value>
<name>Delete Attributes Expression</name>
</value>
</entry>
<entry>
<key>Store State</key>
<value>
<name>Store State</name>
</value>
</entry>
<entry>
<key>Stateful Variables Initial Value</key>
<value>
<name>Stateful Variables Initial Value</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>0 sec</penaltyDuration>
<properties>
<entry>
<key>Delete Attributes Expression</key>
</entry>
<entry>
<key>Store State</key>
<value>Do not store state</value>
</entry>
<entry>
<key>Stateful Variables Initial Value</key>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<executionNodeRestricted>false</executionNodeRestricted>
<name>prepareWhereClauseConditionPart</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<state>RUNNING</state>
<style/>
<type>org.apache.nifi.processors.attributes.UpdateAttribute</type>
</processors>
<processors>
<id>f784d98d-4eaf-395c-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<position>
<x>566.7386527465965</x>
<y>666.4886290541176</y>
</position>
<bundle>
<artifact>nifi-update-attribute-nar</artifact>
<group>org.apache.nifi</group>
<version>1.7.1-2.0_09000</version>
</bundle>
<config>
<annotationData><criteria>
<flowFilePolicy>USE_ORIGINAL</flowFilePolicy>
<rules>
<actions>
<attribute>totalPage</attribute>
<id>0c63c6dd-1748-4e21-8e35-e8645341dd48</id>
<value>${IDList:jsonPath('$.length()'):divide(${batchSize})}</value>
</actions>
<conditions>
<expression>${IDList:jsonPath('$.length()'):mod(${batchSize}):equals('0')}</expression>
<id>20c60e9a-db68-44b1-a22b-e3f83a57dd8a</id>
</conditions>
<id>2fb2c755-2084-409d-b48e-176751d78930</id>
<name>1. IDList Length Mod batchSize = 0</name>
</rules>
<rules>
<actions>
<attribute>specialPage</attribute>
<id>4f763b71-bb46-472c-8364-86e6fbd293fe</id>
<value>${IDList:jsonPath('$.length()'):divide(${batchSize})}</value>
</actions>
<actions>
<attribute>totalPage</attribute>
<id>0c63c6dd-1748-4e21-8e35-e8645341dd48</id>
<value>${IDList:jsonPath('$.length()'):divide(${batchSize}):plus(1)}</value>
</actions>
<conditions>
<expression>${IDList:jsonPath('$.length()'):mod(${batchSize}):equals('0'):not()}</expression>
<id>20c60e9a-db68-44b1-a22b-e3f83a57dd8a</id>
</conditions>
<id>403feae6-e423-4b8b-b15a-e8a93dac5fd7</id>
<name>2. 1. IDList Length Mod batchSize != 0</name>
</rules>
</criteria></annotationData>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Delete Attributes Expression</key>
<value>
<name>Delete Attributes Expression</name>
</value>
</entry>
<entry>
<key>Store State</key>
<value>
<name>Store State</name>
</value>
</entry>
<entry>
<key>Stateful Variables Initial Value</key>
<value>
<name>Stateful Variables Initial Value</name>
</value>
</entry>
<entry>
<key>currentPage</key>
<value>
<name>currentPage</name>
</value>
</entry>
<entry>
<key>IDListLength</key>
<value>
<name>IDListLength</name>
</value>
</entry>
<entry>
<key>pageSize</key>
<value>
<name>pageSize</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Delete Attributes Expression</key>
</entry>
<entry>
<key>Store State</key>
<value>Do not store state</value>
</entry>
<entry>
<key>Stateful Variables Initial Value</key>
</entry>
<entry>
<key>currentPage</key>
<value>1</value>
</entry>
<entry>
<key>IDListLength</key>
<value>${IDList:jsonPath('$.length()')}</value>
</entry>
<entry>
<key>pageSize</key>
<value>${batchSize}</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<executionNodeRestricted>false</executionNodeRestricted>
<name>InitializeTheBatchLoop</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<state>RUNNING</state>
<style/>
<type>org.apache.nifi.processors.attributes.UpdateAttribute</type>
</processors>
</contents>
<name>01 PrepareBatchedSQLStr</name>
<variables/>
</processGroups>
<processors>
<id>30fdeb4d-c403-3862-0000-000000000000</id>
<parentGroupId>eff4ec5e-a8ab-3310-0000-000000000000</parentGroupId>
<position>
<x>351.6160941528465</x>
<y>863.3689543715004</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.7.1-2.0_09000</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>File Size</key>
<value>
<name>File Size</name>
</value>
</entry>
<entry>
<key>Batch Size</key>
<value>
<name>Batch Size</name>
</value>
</entry>
<entry>
<key>Data Format</key>
<value>
<name>Data Format</name>
</value>
</entry>
<entry>
<key>Unique FlowFiles</key>
<value>
<name>Unique FlowFiles</name>
</value>
</entry>
<entry>
<key>generate-ff-custom-text</key>
<value>
<name>generate-ff-custom-text</name>
</value>
</entry>
<entry>
<key>character-set</key>
<value>
<name>character-set</name>
</value>
</entry>
<entry>
<key>mime.type</key>
<value>
<name>mime.type</name>
</value>
</entry>
<entry>
<key>test</key>
<value>
<name>test</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>File Size</key>
<value>0B</value>
</entry>
<entry>
<key>Batch Size</key>
<value>1</value>
</entry>
<entry>
<key>Data Format</key>
<value>Text</value>
</entry>
<entry>
<key>Unique FlowFiles</key>
<value>false</value>
</entry>
<entry>
<key>generate-ff-custom-text</key>
<value>[{
"ID":123,
"params":"xxx"
},{
"ID":124,
"params":"xxx"
},{
"ID":125,
"params":"xxx"
},{
"ID":126,
"params":"xxx"
},{
"ID":127,
"params":"xxx"
}]</value>
</entry>
<entry>
<key>character-set</key>
<value>UTF-8</value>
</entry>
<entry>
<key>mime.type</key>
<value>application/json;charset=utf-8</value>
</entry>
<entry>
<key>test</key>
<value>${literal('ab')
:toUpper()}</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>1 d</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<executionNodeRestricted>false</executionNodeRestricted>
<name>GenerateFlowFile</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<state>RUNNING</state>
<style/>
<type>org.apache.nifi.processors.standard.GenerateFlowFile</type>
</processors>
<processors>
<id>e333c1a6-2f02-341e-0000-000000000000</id>
<parentGroupId>eff4ec5e-a8ab-3310-0000-000000000000</parentGroupId>
<position>
<x>824.6160941528465</x>
<y>1161.3689543715004</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.7.1-2.0_09000</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>log-level</key>
<value>
<name>log-level</name>
</value>
</entry>
<entry>
<key>log-prefix</key>
<value>
<name>log-prefix</name>
</value>
</entry>
<entry>
<key>log-message</key>
<value>
<name>log-message</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>log-level</key>
<value>info</value>
</entry>
<entry>
<key>log-prefix</key>
<value>### RESULT ###</value>
</entry>
<entry>
<key>log-message</key>
<value>${status:replaceEmpty('Success')}${msg}${msgCode}</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<executionNodeRestricted>false</executionNodeRestricted>
<name>LogMessage</name>
<relationships>
<autoTerminate>true</autoTerminate>
<name>success</name>
</relationships>
<state>RUNNING</state>
<style/>
<type>org.apache.nifi.processors.standard.LogMessage</type>
</processors>
</contents>
<name>BatchPreSQL</name>
<variables>
<entry>
<key>batchSize</key>
<value>4</value>
</entry>
</variables>
</processGroups>
</snippet>
<timestamp>01/15/2022 02:19:55 CST</timestamp>
</template>
Created 01-19-2022 07:51 AM
For others that stumble across this.. I ended up delving into scripting processors and implemented a script that does the batching:
var IOUtils = Java.type("org.apache.commons.io.IOUtils")
var StandardCharsets = Java.type("java.nio.charset.StandardCharsets")
// Grab 50 flow files from the input queue (or whatever's available)
flowFileList = session.get(50)
if(!flowFileList.isEmpty()) {
var ids = [];
for each (var flowFile in flowFileList) {
var InputStreamCallback = Java.type("org.apache.nifi.processor.io.InputStreamCallback")
// Create a new InputStreamCallback, passing in a function to define the interface method
session.read(flowFile, new InputStreamCallback(function(inputStream) {
// Get the JSON out of the flowfile and conver to a JS object
var text = IOUtils.toString(inputStream, StandardCharsets.UTF_8);
var obj = JSON.parse(text);
if (obj.hasOwnProperty('SourceDataElementValue')) {
ids.push(obj.SourceDataElementValue);
}
// Do something with text here
}));
// Eat the flowfile after the TML ID is extracted.
session.remove(flowFile);
}
if (ids.length > 0) {
attributeValue = ids.join();
outputFlowFile = session.create();
outputFlowFile = session.putAttribute(outputFlowFile, 'tml_list', attributeValue);
session.transfer(outputFlowFile, REL_SUCCESS)
}
}