Member since
01-14-2022
14
Posts
6
Kudos Received
2
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 6878 | 02-14-2022 03:40 AM | |
| 4083 | 01-16-2022 09:48 PM |
02-14-2022
03:40 AM
1 Kudo
Thank you for your question. You may try using UpdateAttribute Processor's stateful value to deal with the incoming flow files in a batch mode. ============================ Here is the settings for UpdateAttribute ============================ Under the Advanced Mode of UpdateAttribute Processor: Set two rules as below: R0 -> initializeBatchIndex Conditions: ${getStateValue("fragment.index"):equals(-1):or(${getStateValue('fragment.index'):plus(1):ge(${batchSize})})} Actions (add fragment related attributes): fragment.count ${batchSize} fragment.identifier (For each batch, it should generate a new UUID as the identifier) ${UUID()} fragment.index ${getStateValue('fragment.index'):plus(1):mod(${batchSize})} R1 -> Iterations Conditions: ${getStateValue("fragment.index"):equals(-1):or(${getStateValue('fragment.index'):plus(1):ge(${batchSize})}):not()} Actions (add fragment related attributes): fragment.count(This parameter may be optional as it always be the same size around one specific batch test ) ${getStateValue('fragment.count')} fragment.identifier ${getStateValue('fragment.identifier')} fragment.index ${getStateValue('fragment.index'):plus(1):mod(${batchSize})} NOTE: Before that, we can set a Variables in your current Process Group( right click en empty area inside your process group, select variables, and add a variable named batchSize, with proper merged count you wanna set) The result of the merged flow files would be merged via the same fragment.identifier. Please let me know if this helps. Thanks & Regards, Oliver Gong
... View more
01-17-2022
06:46 AM
1 Kudo
Glad to hear that works for you! Though we can split the raw csv or json content into a smaller size just in case of OOM issue when doing the shifting stuff on JoltTransformJSON, it may cause other issue like: - result from JoltTransformJSON is not complete. That means, some part of the same user data ( with different bill_date) may be wrapped in the other flow files. In such case, we will need to merge them back as a whole. - It would be better if we can keep raw csv data in some DB table. Then drag out the data rows with a specified limit--> we can use "split pages" way to query on such table, - We can then easily fetch a logical completed data to do the rest shift things. With such completed result, we don't need to worry about the data is not an info-incomplete one.
... View more
01-16-2022
11:13 PM
2 Kudos
Attached the corresponding screenshots to cover the detail on implementation. 0. The whole data flow is like: 1.GenerateFlowFile: 2. ConvertRecord: 2.1 CSV Reader(LG CSVReader): 2.2 JsonRecordSetWriter (LG JsonRecordSetWriter): 3. JoltTransformJSON 3.1 JoltTransformJSON - Advanced Mode: Should you have any questions, just feel free to contact me. Thanks, Oliver Gong
... View more
01-16-2022
09:48 PM
1 Kudo
Thank you for your question. We can try following steps to make it work as expected. 1. Convert csv to json ( you may use ConvertRecord ), to get JSON as below [{
"mobile": "9876543210",
"username": "John",
"bill_id": "AB10",
"bill_date": "2020-12-23",
"product_id": "X34",
"product_price": "500"
}, {
"mobile": "9876543210",
"username": "John",
"bill_id": "AB10",
"bill_date": "2020-12-23",
"product_id": "X35",
"product_price": "230"
}, {
"mobile": "9876543210",
"username": "John",
"bill_id": "AB22",
"bill_date": "2020-11-14",
"product_id": "X89",
"product_price": "700"
}, {
"mobile": "9999999999",
"username": "Alice",
"bill_id": "AC54",
"bill_date": "2019-12-20",
"product_id": "X10",
"product_price": "109"
}] 2. We will then use JoltTransformJSON to make the shift stuff, the specification of such JOLT is listed as: [ {
"operation" : "shift",
"spec" : {
"*" : {
"*" : "@(1,username)@(1,mobile)@(1,bill_id)@(1,bill_date)@(1,product_id).&"
}
}
}, {
"operation" : "shift",
"spec" : {
"*" : {
"$" : "users.[#2].username",
"*" : {
"$" : "users.[#3].mobile",
"*" : {
"*" : {
"$" : "users.[#5].bills.[#3].bill_date",
"$1" : "users.[#5].bills.[#3].bill_id",
"*" : {
"product_*" : "users.[#6].bills.[#4].products.[#2].&"
}
}
}
}
}
}
} ] NOTE: - For the JOLT spec, we gonna group the raw data into organic fields, which means we will get the data which are grouped by "username + mobile + bill_id +bill_date + product_id " fields. - After that, we get the phase-1 shifted data that are ordered by unique keys. - For the phase-2 shift, we gonna list out the data as predefined logic which is inferred from your attached sample output. 3. For details on JOLT spec, you may refer to the following link: https://jolt-demo.appspot.com/ Hope this helps. Thanks Oliver Gong
... View more
01-14-2022
11:50 AM
Perhaps we could first read the flow file content in json format, and then parsing it as json object. ( e.g. if using groovy script, we might use JsonSlurper to deal with such flow file content of valid JSON format. ) Then the rest of the thing could be dealing with the iteration loop for its ID array, and following the same logic to distinguish the different pages.
... View more
01-14-2022
11:37 AM
Attached the xml template file for your reference: <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<template encoding-version="1.2">
<description>BatchPreSQL
Author: Oliver Gong
DATE: 2022-01-15
</description>
<groupId>0a50e507-017e-1000-2e5c-c691a8dbb45b</groupId>
<name>BatchPreSQL</name>
<snippet>
<processGroups>
<id>eff4ec5e-a8ab-3310-0000-000000000000</id>
<parentGroupId>5511af2f-6c32-35cc-0000-000000000000</parentGroupId>
<position>
<x>0.0</x>
<y>0.0</y>
</position>
<additions/>
<comments>BatchPreSQL
Author: Oliver Gong
DATE: 2022-01-14
</comments>
<contents>
<connections>
<id>15c43a7e-4e9a-39c0-0000-000000000000</id>
<parentGroupId>eff4ec5e-a8ab-3310-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>8901cdca-8fdd-3d01-0000-000000000000</id>
<type>INPUT_PORT</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>eff4ec5e-a8ab-3310-0000-000000000000</groupId>
<id>30fdeb4d-c403-3862-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>97ca5515-e845-3e39-0000-000000000000</id>
<parentGroupId>eff4ec5e-a8ab-3310-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<bends>
<x>643.1160941528465</x>
<y>1122.8689543715004</y>
</bends>
<destination>
<groupId>eff4ec5e-a8ab-3310-0000-000000000000</groupId>
<id>e333c1a6-2f02-341e-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<source>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>6ff09bae-c4a1-38c5-0000-000000000000</id>
<type>OUTPUT_PORT</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>9874061c-5dd6-3fd9-0000-000000000000</id>
<parentGroupId>eff4ec5e-a8ab-3310-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>eff4ec5e-a8ab-3310-0000-000000000000</groupId>
<id>74dbf5d9-50b9-3055-0000-000000000000</id>
<type>FUNNEL</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<source>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>b8c0272c-e161-3456-0000-000000000000</id>
<type>OUTPUT_PORT</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>e271b74a-2af3-3fcf-0000-000000000000</id>
<parentGroupId>eff4ec5e-a8ab-3310-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>eff4ec5e-a8ab-3310-0000-000000000000</groupId>
<id>e333c1a6-2f02-341e-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<source>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>3e3e7f3b-3d3a-3b8d-0000-000000000000</id>
<type>OUTPUT_PORT</type>
</source>
<zIndex>0</zIndex>
</connections>
<funnels>
<id>74dbf5d9-50b9-3055-0000-000000000000</id>
<parentGroupId>eff4ec5e-a8ab-3310-0000-000000000000</parentGroupId>
<position>
<x>386.6160941528465</x>
<y>1410.3689543715004</y>
</position>
</funnels>
<labels>
<id>087856d3-c058-355e-0000-000000000000</id>
<parentGroupId>eff4ec5e-a8ab-3310-0000-000000000000</parentGroupId>
<position>
<x>94.61609415284647</x>
<y>782.3689543715004</y>
</position>
<height>255.0</height>
<label>1. Generate a request body to mock the client’s json request as following:
[{
"ID":123,
"params":"xxx"
},{
"ID":124,
"params":"xxx"
},{
"ID":125,
"params":"xxx"
},{
"ID":126,
"params":"xxx"
},{
"ID":127,
"params":"xxx"
}]</label>
<style>
<entry>
<key>font-size</key>
<value>12px</value>
</entry>
</style>
<width>528.0</width>
</labels>
<labels>
<id>4a9b240d-8ba7-3852-0000-000000000000</id>
<parentGroupId>eff4ec5e-a8ab-3310-0000-000000000000</parentGroupId>
<position>
<x>-30.383905847153528</x>
<y>1472.3689543715004</y>
</position>
<height>439.0</height>
<label>Description on each condition scope of the spawn SQLs
==============================================
The index of data from IDList arrays is listed like
1stPage: [FROM 0 TO pageSize-1]
2ndPage: [FROM pageSize TO 2*pageSize-1]
...
MiddlePage: [FROM (currentPage-1)*pageSize TO pageNumber*pageSize-1]
...
LastPage: [FROM (currentPage-1)*pageSiz TO IDListLength-1 ]
As per the JSONPath grammar, to slice the array into a sub array,
the specification we need to know is that :
when using [I_START:I_END] JSONPath to slice the array,
-> the data we gonna get is FROM index I_START TO I_END -1
which means the index we specified after ":" would not be included to the slicing result. </label>
<style>
<entry>
<key>font-size</key>
<value>24px</value>
</entry>
</style>
<width>1040.0</width>
</labels>
<labels>
<id>9e75a5d0-b823-3548-0000-000000000000</id>
<parentGroupId>eff4ec5e-a8ab-3310-0000-000000000000</parentGroupId>
<position>
<x>-414.3839058471535</x>
<y>-6.6310914048667655</y>
</position>
<height>732.0</height>
<label>This module(Processor Group) is going to demonstrate the way on
how to prepare a batched SQLs when trying to look up a range of data in DB.
Once you finish importing this module, here is the guide on
+++++++++++++++++++++++
HOW TO USE THIS MODULE:
1. Start the whole module's processors, check the content/attribute of the flowfile coming from the sub module(01 PrepareBatchedSQLStr)
2. Check the batched sqlStr.
+++++++++++++++++++++++
You can try below stuff, if you'd like to do some further validation on this algorithm:
1. CHANGE the module variable named batchSize.
With different batchSize settings, you would see the sqlStr being prepared accordingly.
(
A. Right click on empty area on the canvas of current Module--BatchPreSQL,
B. Then select and click "Variables" button to jump into Variables setting page.
C. Set variable-batchSize with other int values(postive int number exclude 0) to configure the pageSize of current algorithm.
1. batchSize > IDListLength
2. batchSize = IDListLength
3. batchSize < IDListLength (postive int number exclude 0 or you may use abs EL function to support the negative values)
D. Run the whole biz flow to verify such result.
)
2. You may also apply with larger raw json array to run the biz flow.
Please feel free to let me know if you have any question.</label>
<style>
<entry>
<key>background-color</key>
<value>#53b3d9</value>
</entry>
<entry>
<key>font-size</key>
<value>24px</value>
</entry>
</style>
<width>1514.0</width>
</labels>
<processGroups>
<id>56e121fa-15d6-3fd4-0000-000000000000</id>
<parentGroupId>eff4ec5e-a8ab-3310-0000-000000000000</parentGroupId>
<position>
<x>217.61609415284647</x>
<y>1104.3689543715004</y>
</position>
<additions/>
<comments></comments>
<contents>
<connections>
<id>07a8f12b-7736-3a6c-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>9ef7cadf-476e-3437-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>failure</selectedRelationships>
<selectedRelationships>unmatched</selectedRelationships>
<source>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>4ee2df31-2f9b-34d5-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>13b22f01-b24d-359f-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>60fb5121-b8bc-394a-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>19e51c58-e665-33c7-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>1736190e-f132-3dde-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>2428e910-0762-38e2-0000-000000000000</id>
<type>FUNNEL</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name>Finish Batch Loop</name>
<selectedRelationships>unmatched</selectedRelationships>
<source>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>97407168-ea79-32f1-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>37a04d0a-9f84-3813-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>ed04498f-82f7-37f7-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>1. Inside the Batch Loop</selectedRelationships>
<source>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>97407168-ea79-32f1-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>4a4cc1bc-c93c-37f3-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>b8c0272c-e161-3456-0000-000000000000</id>
<type>OUTPUT_PORT</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>failure</selectedRelationships>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>60fb5121-b8bc-394a-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>4eef65de-4c95-3ca3-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>6ff09bae-c4a1-38c5-0000-000000000000</id>
<type>OUTPUT_PORT</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>6837ac99-5ce2-3c47-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>5cd2322c-6cf8-3117-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>97407168-ea79-32f1-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>f784d98d-4eaf-395c-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>660fbef2-894b-3ba6-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>e4d04817-e7d4-34a9-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>ed04498f-82f7-37f7-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>6a063a4c-e0e0-3f3f-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>3e3e7f3b-3d3a-3b8d-0000-000000000000</id>
<type>OUTPUT_PORT</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<source>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>2428e910-0762-38e2-0000-000000000000</id>
<type>FUNNEL</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>73d4d5b3-ed4d-33ef-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>f784d98d-4eaf-395c-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>matched</selectedRelationships>
<source>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>4ee2df31-2f9b-34d5-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>84fbfa68-f4f4-3ff8-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>a37238fd-6b1a-334b-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<source>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>8901cdca-8fdd-3d01-0000-000000000000</id>
<type>INPUT_PORT</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>955217f3-8048-314d-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>4ee2df31-2f9b-34d5-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>a37238fd-6b1a-334b-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>b447a208-a363-3f24-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>19e51c58-e665-33c7-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>ed04498f-82f7-37f7-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>b5babaa5-ec72-3496-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>6837ac99-5ce2-3c47-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>failure</selectedRelationships>
<source>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>a37238fd-6b1a-334b-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>b97777ac-b7ee-30e2-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>97407168-ea79-32f1-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>e4d04817-e7d4-34a9-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>fd991255-d565-30f1-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>6ff09bae-c4a1-38c5-0000-000000000000</id>
<type>OUTPUT_PORT</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>9ef7cadf-476e-3437-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<funnels>
<id>2428e910-0762-38e2-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<position>
<x>1177.000005285659</x>
<y>694.9999968519692</y>
</position>
</funnels>
<inputPorts>
<id>8901cdca-8fdd-3d01-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<position>
<x>486.000005285659</x>
<y>46.99999685196917</y>
</position>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<name>IN</name>
<state>RUNNING</state>
<type>INPUT_PORT</type>
</inputPorts>
<labels>
<id>29a66daf-8ba8-387c-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<position>
<x>1070.6160941528465</x>
<y>945.3690154066567</y>
</position>
<height>124.0</height>
<label>JSONPath
[start:end]
Selects array elements from the start index and up to,
but not including, end index.
[start:]
If end is omitted,
selects all elements from start until the end of the array. Returns a list.</label>
<style>
<entry>
<key>font-size</key>
<value>12px</value>
</entry>
</style>
<width>393.0</width>
</labels>
<labels>
<id>69a045b5-f89a-3bdd-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<position>
<x>707.000005285659</x>
<y>243.99999685196917</y>
</position>
<height>47.0</height>
<label>Here we are going to extract the IDList which contains the whole ID(s) from the raw data.</label>
<style>
<entry>
<key>font-size</key>
<value>18px</value>
</entry>
</style>
<width>732.0</width>
</labels>
<outputPorts>
<id>3e3e7f3b-3d3a-3b8d-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<position>
<x>1418.7386527465965</x>
<y>683.4886290541176</y>
</position>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<name>Finish Preparing Batch SQL strs</name>
<state>RUNNING</state>
<type>OUTPUT_PORT</type>
</outputPorts>
<outputPorts>
<id>6ff09bae-c4a1-38c5-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<position>
<x>-250.38390584715353</x>
<y>340.3689543715004</y>
</position>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<name>Exceptions</name>
<state>RUNNING</state>
<type>OUTPUT_PORT</type>
</outputPorts>
<outputPorts>
<id>b8c0272c-e161-3456-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<position>
<x>835.7386527465965</x>
<y>1555.4886290541176</y>
</position>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<name>Go2RemainBiz</name>
<state>RUNNING</state>
<type>OUTPUT_PORT</type>
</outputPorts>
<processors>
<id>19e51c58-e665-33c7-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<position>
<x>883.6160941528465</x>
<y>1151.3689543715004</y>
</position>
<bundle>
<artifact>nifi-update-attribute-nar</artifact>
<group>org.apache.nifi</group>
<version>1.7.1-2.0_09000</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments>If there only 1 element in the fetching scope, we would surround it with ('<LastPage_OnlyOneExpectedValue2Fetch>')</comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Delete Attributes Expression</key>
<value>
<name>Delete Attributes Expression</name>
</value>
</entry>
<entry>
<key>Store State</key>
<value>
<name>Store State</name>
</value>
</entry>
<entry>
<key>Stateful Variables Initial Value</key>
<value>
<name>Stateful Variables Initial Value</name>
</value>
</entry>
<entry>
<key>sqlStr</key>
<value>
<name>sqlStr</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Delete Attributes Expression</key>
</entry>
<entry>
<key>Store State</key>
<value>Do not store state</value>
</entry>
<entry>
<key>Stateful Variables Initial Value</key>
</entry>
<entry>
<key>sqlStr</key>
<value>SELECT
*
FROM
<TABLE_NAME>
WHERE
ID IN ${sqlStrBatchCondition:contains(','):not()
:ifElse(
"('${sqlStrBatchCondition}')",
${sqlStrBatchCondition}
)
}</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<executionNodeRestricted>false</executionNodeRestricted>
<name>prepareSQLStrs</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<state>RUNNING</state>
<style/>
<type>org.apache.nifi.processors.attributes.UpdateAttribute</type>
</processors>
<processors>
<id>4ee2df31-2f9b-34d5-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<position>
<x>564.7386527465965</x>
<y>439.4885985365395</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.7.1-2.0_09000</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Destination</key>
<value>
<name>Destination</name>
</value>
</entry>
<entry>
<key>Return Type</key>
<value>
<name>Return Type</name>
</value>
</entry>
<entry>
<key>Path Not Found Behavior</key>
<value>
<name>Path Not Found Behavior</name>
</value>
</entry>
<entry>
<key>Null Value Representation</key>
<value>
<name>Null Value Representation</name>
</value>
</entry>
<entry>
<key>IDList</key>
<value>
<name>IDList</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Destination</key>
<value>flowfile-attribute</value>
</entry>
<entry>
<key>Return Type</key>
<value>json</value>
</entry>
<entry>
<key>Path Not Found Behavior</key>
<value>ignore</value>
</entry>
<entry>
<key>Null Value Representation</key>
<value>empty string</value>
</entry>
<entry>
<key>IDList</key>
<value>$.IDList</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<executionNodeRestricted>false</executionNodeRestricted>
<name>EvaluateJsonPath</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>matched</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>unmatched</name>
</relationships>
<state>RUNNING</state>
<style/>
<type>org.apache.nifi.processors.standard.EvaluateJsonPath</type>
</processors>
<processors>
<id>60fb5121-b8bc-394a-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<position>
<x>893.7386527465965</x>
<y>1365.4886290541176</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.7.1-2.0_11005</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Regular Expression</key>
<value>
<name>Regular Expression</name>
</value>
</entry>
<entry>
<key>Replacement Value</key>
<value>
<name>Replacement Value</name>
</value>
</entry>
<entry>
<key>Character Set</key>
<value>
<name>Character Set</name>
</value>
</entry>
<entry>
<key>Maximum Buffer Size</key>
<value>
<name>Maximum Buffer Size</name>
</value>
</entry>
<entry>
<key>Replacement Strategy</key>
<value>
<name>Replacement Strategy</name>
</value>
</entry>
<entry>
<key>Evaluation Mode</key>
<value>
<name>Evaluation Mode</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Regular Expression</key>
<value>(?s)(^.*$)</value>
</entry>
<entry>
<key>Replacement Value</key>
<value>==========
TotalPage: ${totalPage}
CurrentPage: ${currentPage}
PageSize:${batchSize}
==========
SQL is:
${sqlStr}</value>
</entry>
<entry>
<key>Character Set</key>
<value>UTF-8</value>
</entry>
<entry>
<key>Maximum Buffer Size</key>
<value>1 MB</value>
</entry>
<entry>
<key>Replacement Strategy</key>
<value>Always Replace</value>
</entry>
<entry>
<key>Evaluation Mode</key>
<value>Entire text</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<executionNodeRestricted>false</executionNodeRestricted>
<name>PrintResult</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<state>RUNNING</state>
<style/>
<type>org.apache.nifi.processors.standard.ReplaceText</type>
</processors>
<processors>
<id>6837ac99-5ce2-3c47-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<position>
<x>245.00000528565897</x>
<y>227.99999685196917</y>
</position>
<bundle>
<artifact>nifi-update-attribute-nar</artifact>
<group>org.apache.nifi</group>
<version>1.7.1-2.0_09000</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Delete Attributes Expression</key>
<value>
<name>Delete Attributes Expression</name>
</value>
</entry>
<entry>
<key>Store State</key>
<value>
<name>Store State</name>
</value>
</entry>
<entry>
<key>Stateful Variables Initial Value</key>
<value>
<name>Stateful Variables Initial Value</name>
</value>
</entry>
<entry>
<key>msg</key>
<value>
<name>msg</name>
</value>
</entry>
<entry>
<key>msgCode</key>
<value>
<name>msgCode</name>
</value>
</entry>
<entry>
<key>status</key>
<value>
<name>status</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Delete Attributes Expression</key>
</entry>
<entry>
<key>Store State</key>
<value>Do not store state</value>
</entry>
<entry>
<key>Stateful Variables Initial Value</key>
</entry>
<entry>
<key>msg</key>
<value>Failed to extract IDList from raw data via JOLT.</value>
</entry>
<entry>
<key>msgCode</key>
<value>01:01</value>
</entry>
<entry>
<key>status</key>
<value>failed</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<executionNodeRestricted>false</executionNodeRestricted>
<name>01 PrepareExceptionInfo</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<state>RUNNING</state>
<style/>
<type>org.apache.nifi.processors.attributes.UpdateAttribute</type>
</processors>
<processors>
<id>97407168-ea79-32f1-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<position>
<x>868.7386527465965</x>
<y>670.4886290541176</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.7.1-2.0_09000</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Routing Strategy</key>
<value>
<name>Routing Strategy</name>
</value>
</entry>
<entry>
<key>1. Inside the Batch Loop</key>
<value>
<name>1. Inside the Batch Loop</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Routing Strategy</key>
<value>Route to Property name</value>
</entry>
<entry>
<key>1. Inside the Batch Loop</key>
<value>${currentPage:le(${totalPage})}</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<executionNodeRestricted>false</executionNodeRestricted>
<name>RouteOnAttribute</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>1. Inside the Batch Loop</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>unmatched</name>
</relationships>
<state>RUNNING</state>
<style/>
<type>org.apache.nifi.processors.standard.RouteOnAttribute</type>
</processors>
<processors>
<id>9ef7cadf-476e-3437-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<position>
<x>247.00000528565897</x>
<y>439.9999968519692</y>
</position>
<bundle>
<artifact>nifi-update-attribute-nar</artifact>
<group>org.apache.nifi</group>
<version>1.7.1-2.0_09000</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Delete Attributes Expression</key>
<value>
<name>Delete Attributes Expression</name>
</value>
</entry>
<entry>
<key>Store State</key>
<value>
<name>Store State</name>
</value>
</entry>
<entry>
<key>Stateful Variables Initial Value</key>
<value>
<name>Stateful Variables Initial Value</name>
</value>
</entry>
<entry>
<key>msg</key>
<value>
<name>msg</name>
</value>
</entry>
<entry>
<key>msgCode</key>
<value>
<name>msgCode</name>
</value>
</entry>
<entry>
<key>status</key>
<value>
<name>status</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Delete Attributes Expression</key>
</entry>
<entry>
<key>Store State</key>
<value>Do not store state</value>
</entry>
<entry>
<key>Stateful Variables Initial Value</key>
</entry>
<entry>
<key>msg</key>
<value>Failed to extract IDList from flowfile content to attribute.</value>
</entry>
<entry>
<key>msgCode</key>
<value>01:02</value>
</entry>
<entry>
<key>status</key>
<value>failed</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<executionNodeRestricted>false</executionNodeRestricted>
<name>02 PrepareExceptionInfo</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<state>RUNNING</state>
<style/>
<type>org.apache.nifi.processors.attributes.UpdateAttribute</type>
</processors>
<processors>
<id>a37238fd-6b1a-334b-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<position>
<x>558.6317191528465</x>
<y>227.36889333634417</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.7.1-2.0_11005</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments>Here we are going to extract the IDList which contains the whole ID(s) from the raw data.</comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>jolt-transform</key>
<value>
<name>jolt-transform</name>
</value>
</entry>
<entry>
<key>jolt-custom-class</key>
<value>
<name>jolt-custom-class</name>
</value>
</entry>
<entry>
<key>jolt-custom-modules</key>
<value>
<name>jolt-custom-modules</name>
</value>
</entry>
<entry>
<key>jolt-spec</key>
<value>
<name>jolt-spec</name>
</value>
</entry>
<entry>
<key>Transform Cache Size</key>
<value>
<name>Transform Cache Size</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>jolt-transform</key>
<value>jolt-transform-chain</value>
</entry>
<entry>
<key>jolt-custom-class</key>
</entry>
<entry>
<key>jolt-custom-modules</key>
</entry>
<entry>
<key>jolt-spec</key>
<value>[{
"operation": "shift",
"spec": {
"*":{
"ID":"IDList"
}
}
}
]</value>
</entry>
<entry>
<key>Transform Cache Size</key>
<value>1</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<executionNodeRestricted>false</executionNodeRestricted>
<name>JoltTransformJSON</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<state>RUNNING</state>
<style/>
<type>org.apache.nifi.processors.standard.JoltTransformJSON</type>
</processors>
<processors>
<id>e4d04817-e7d4-34a9-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<position>
<x>577.7386527465965</x>
<y>946.4886290541176</y>
</position>
<bundle>
<artifact>nifi-update-attribute-nar</artifact>
<group>org.apache.nifi</group>
<version>1.7.1-2.0_09000</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Delete Attributes Expression</key>
<value>
<name>Delete Attributes Expression</name>
</value>
</entry>
<entry>
<key>Store State</key>
<value>
<name>Store State</name>
</value>
</entry>
<entry>
<key>Stateful Variables Initial Value</key>
<value>
<name>Stateful Variables Initial Value</name>
</value>
</entry>
<entry>
<key>currentPage</key>
<value>
<name>currentPage</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Delete Attributes Expression</key>
</entry>
<entry>
<key>Store State</key>
<value>Do not store state</value>
</entry>
<entry>
<key>Stateful Variables Initial Value</key>
</entry>
<entry>
<key>currentPage</key>
<value>${currentPage:plus(1)}</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<executionNodeRestricted>false</executionNodeRestricted>
<name>ScrollcurrentPage</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<state>RUNNING</state>
<style/>
<type>org.apache.nifi.processors.attributes.UpdateAttribute</type>
</processors>
<processors>
<id>ed04498f-82f7-37f7-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<position>
<x>874.7386527465965</x>
<y>947.4886290541176</y>
</position>
<bundle>
<artifact>nifi-update-attribute-nar</artifact>
<group>org.apache.nifi</group>
<version>1.7.1-2.0_09000</version>
</bundle>
<config>
<annotationData><criteria>
<flowFilePolicy>USE_ORIGINAL</flowFilePolicy>
<rules>
<actions>
<attribute>sqlStrBatchCondition</attribute>
<id>570f5b7b-f835-4c8d-8f5d-e08deda1c7c1</id>
<value>${IDList:jsonPath("$[0:${IDListLength}]")
:replaceAll(',',"','"):replaceAll('\[',"('"):replaceAll('\]',"')")
}
</value>
</actions>
<conditions>
<expression>${currentPage:equals(1)}</expression>
<id>97d055e9-12d2-439c-93a4-8642f0bebd6d</id>
</conditions>
<conditions>
<expression>${IDListLength:le(${pageSize})}</expression>
<id>fa0053b2-a183-4f33-9788-3fe68ccae5f1</id>
</conditions>
<id>c0bb8d5c-cb73-40f5-8323-632122cb75ac</id>
<name>1.1 First Page &amp;&amp; IDListLength &lt;= pageSize </name>
</rules>
<rules>
<actions>
<attribute>sqlStrBatchCondition</attribute>
<id>570f5b7b-f835-4c8d-8f5d-e08deda1c7c1</id>
<value>${IDList:jsonPath("$[0:${pageSize}]")
:replaceAll(',',"','"):replaceAll('\[',"('"):replaceAll('\]',"')")
}</value>
</actions>
<conditions>
<expression>${currentPage:equals(1)}</expression>
<id>97d055e9-12d2-439c-93a4-8642f0bebd6d</id>
</conditions>
<conditions>
<expression>${IDListLength:le(${pageSize}):not()}</expression>
<id>fa0053b2-a183-4f33-9788-3fe68ccae5f1</id>
</conditions>
<id>b351555e-2dd8-4303-bd05-6aff43731a38</id>
<name>1.2 First Page &amp;&amp; IDListLength &gt; pageSize </name>
</rules>
<rules>
<actions>
<attribute>sqlStrBatchCondition</attribute>
<id>570f5b7b-f835-4c8d-8f5d-e08deda1c7c1</id>
<value>${IDList:jsonPath("$[${currentPage:minus(1):multiply(${pageSize})}:${currentPage:minus(1):multiply(${pageSize}):plus(${pageSize})}]")
:replaceAll(',',"','"):replaceAll('\[',"('"):replaceAll('\]',"')")
}</value>
</actions>
<conditions>
<expression>${currentPage:lt(${totalPage})}</expression>
<id>46a93c40-dac1-4502-8dac-ceb2b12a2756</id>
</conditions>
<conditions>
<expression>${currentPage:gt(1)}</expression>
<id>97d055e9-12d2-439c-93a4-8642f0bebd6d</id>
</conditions>
<id>e19f2512-98be-47fa-8e0b-76339348f853</id>
<name>2. MiddlePage</name>
</rules>
<rules>
<actions>
<attribute>sqlStrBatchCondition</attribute>
<id>570f5b7b-f835-4c8d-8f5d-e08deda1c7c1</id>
<value>${IDList:jsonPath("$[${currentPage:minus(1):multiply(${pageSize})}:${IDListLength}]")
:replaceAll(',',"','"):replaceAll('\[',"('"):replaceAll('\]',"')")
}</value>
</actions>
<conditions>
<expression>${currentPage:equals(${totalPage})}</expression>
<id>97d055e9-12d2-439c-93a4-8642f0bebd6d</id>
</conditions>
<conditions>
<expression>${totalPage:equals(1):not()}</expression>
<id>46a93c40-dac1-4502-8dac-ceb2b12a2756</id>
</conditions>
<id>0d277b81-3121-4d87-844d-4dcc1325fb2c</id>
<name>3. LastPage &amp;&amp; totalPage !=1</name>
</rules>
</criteria></annotationData>
<bulletinLevel>WARN</bulletinLevel>
<comments>SELECT
*
FROM
<TABLE_NAME>
WHERE
ID IN ${index:equals(${specificFlag:replaceEmpty('')})
:ifElse(
${IDList:jsonPath("$[${index:multiply(${batchSize})}:${IDListLength:minus(1)}]")
:replaceAll(',',"','"):replaceAll('\[',"('"):replaceAll('\]',"')")
},
${IDList:jsonPath("$[${index:multiply(${batchSize})}:${index:multiply(${batchSize}):plus(${batchSize})}]")
:replaceAll(',',"','"):replaceAll('\[',"('"):replaceAll('\]',"')")
}
)
}</comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Delete Attributes Expression</key>
<value>
<name>Delete Attributes Expression</name>
</value>
</entry>
<entry>
<key>Store State</key>
<value>
<name>Store State</name>
</value>
</entry>
<entry>
<key>Stateful Variables Initial Value</key>
<value>
<name>Stateful Variables Initial Value</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>0 sec</penaltyDuration>
<properties>
<entry>
<key>Delete Attributes Expression</key>
</entry>
<entry>
<key>Store State</key>
<value>Do not store state</value>
</entry>
<entry>
<key>Stateful Variables Initial Value</key>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<executionNodeRestricted>false</executionNodeRestricted>
<name>prepareWhereClauseConditionPart</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<state>RUNNING</state>
<style/>
<type>org.apache.nifi.processors.attributes.UpdateAttribute</type>
</processors>
<processors>
<id>f784d98d-4eaf-395c-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<position>
<x>566.7386527465965</x>
<y>666.4886290541176</y>
</position>
<bundle>
<artifact>nifi-update-attribute-nar</artifact>
<group>org.apache.nifi</group>
<version>1.7.1-2.0_09000</version>
</bundle>
<config>
<annotationData><criteria>
<flowFilePolicy>USE_ORIGINAL</flowFilePolicy>
<rules>
<actions>
<attribute>totalPage</attribute>
<id>0c63c6dd-1748-4e21-8e35-e8645341dd48</id>
<value>${IDList:jsonPath('$.length()'):divide(${batchSize})}</value>
</actions>
<conditions>
<expression>${IDList:jsonPath('$.length()'):mod(${batchSize}):equals('0')}</expression>
<id>20c60e9a-db68-44b1-a22b-e3f83a57dd8a</id>
</conditions>
<id>2fb2c755-2084-409d-b48e-176751d78930</id>
<name>1. IDList Length Mod batchSize = 0</name>
</rules>
<rules>
<actions>
<attribute>specialPage</attribute>
<id>4f763b71-bb46-472c-8364-86e6fbd293fe</id>
<value>${IDList:jsonPath('$.length()'):divide(${batchSize})}</value>
</actions>
<actions>
<attribute>totalPage</attribute>
<id>0c63c6dd-1748-4e21-8e35-e8645341dd48</id>
<value>${IDList:jsonPath('$.length()'):divide(${batchSize}):plus(1)}</value>
</actions>
<conditions>
<expression>${IDList:jsonPath('$.length()'):mod(${batchSize}):equals('0'):not()}</expression>
<id>20c60e9a-db68-44b1-a22b-e3f83a57dd8a</id>
</conditions>
<id>403feae6-e423-4b8b-b15a-e8a93dac5fd7</id>
<name>2. 1. IDList Length Mod batchSize != 0</name>
</rules>
</criteria></annotationData>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Delete Attributes Expression</key>
<value>
<name>Delete Attributes Expression</name>
</value>
</entry>
<entry>
<key>Store State</key>
<value>
<name>Store State</name>
</value>
</entry>
<entry>
<key>Stateful Variables Initial Value</key>
<value>
<name>Stateful Variables Initial Value</name>
</value>
</entry>
<entry>
<key>currentPage</key>
<value>
<name>currentPage</name>
</value>
</entry>
<entry>
<key>IDListLength</key>
<value>
<name>IDListLength</name>
</value>
</entry>
<entry>
<key>pageSize</key>
<value>
<name>pageSize</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Delete Attributes Expression</key>
</entry>
<entry>
<key>Store State</key>
<value>Do not store state</value>
</entry>
<entry>
<key>Stateful Variables Initial Value</key>
</entry>
<entry>
<key>currentPage</key>
<value>1</value>
</entry>
<entry>
<key>IDListLength</key>
<value>${IDList:jsonPath('$.length()')}</value>
</entry>
<entry>
<key>pageSize</key>
<value>${batchSize}</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<executionNodeRestricted>false</executionNodeRestricted>
<name>InitializeTheBatchLoop</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<state>RUNNING</state>
<style/>
<type>org.apache.nifi.processors.attributes.UpdateAttribute</type>
</processors>
</contents>
<name>01 PrepareBatchedSQLStr</name>
<variables/>
</processGroups>
<processors>
<id>30fdeb4d-c403-3862-0000-000000000000</id>
<parentGroupId>eff4ec5e-a8ab-3310-0000-000000000000</parentGroupId>
<position>
<x>351.6160941528465</x>
<y>863.3689543715004</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.7.1-2.0_09000</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>File Size</key>
<value>
<name>File Size</name>
</value>
</entry>
<entry>
<key>Batch Size</key>
<value>
<name>Batch Size</name>
</value>
</entry>
<entry>
<key>Data Format</key>
<value>
<name>Data Format</name>
</value>
</entry>
<entry>
<key>Unique FlowFiles</key>
<value>
<name>Unique FlowFiles</name>
</value>
</entry>
<entry>
<key>generate-ff-custom-text</key>
<value>
<name>generate-ff-custom-text</name>
</value>
</entry>
<entry>
<key>character-set</key>
<value>
<name>character-set</name>
</value>
</entry>
<entry>
<key>mime.type</key>
<value>
<name>mime.type</name>
</value>
</entry>
<entry>
<key>test</key>
<value>
<name>test</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>File Size</key>
<value>0B</value>
</entry>
<entry>
<key>Batch Size</key>
<value>1</value>
</entry>
<entry>
<key>Data Format</key>
<value>Text</value>
</entry>
<entry>
<key>Unique FlowFiles</key>
<value>false</value>
</entry>
<entry>
<key>generate-ff-custom-text</key>
<value>[{
"ID":123,
"params":"xxx"
},{
"ID":124,
"params":"xxx"
},{
"ID":125,
"params":"xxx"
},{
"ID":126,
"params":"xxx"
},{
"ID":127,
"params":"xxx"
}]</value>
</entry>
<entry>
<key>character-set</key>
<value>UTF-8</value>
</entry>
<entry>
<key>mime.type</key>
<value>application/json;charset=utf-8</value>
</entry>
<entry>
<key>test</key>
<value>${literal('ab')
:toUpper()}</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>1 d</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<executionNodeRestricted>false</executionNodeRestricted>
<name>GenerateFlowFile</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<state>RUNNING</state>
<style/>
<type>org.apache.nifi.processors.standard.GenerateFlowFile</type>
</processors>
<processors>
<id>e333c1a6-2f02-341e-0000-000000000000</id>
<parentGroupId>eff4ec5e-a8ab-3310-0000-000000000000</parentGroupId>
<position>
<x>824.6160941528465</x>
<y>1161.3689543715004</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.7.1-2.0_09000</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>log-level</key>
<value>
<name>log-level</name>
</value>
</entry>
<entry>
<key>log-prefix</key>
<value>
<name>log-prefix</name>
</value>
</entry>
<entry>
<key>log-message</key>
<value>
<name>log-message</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>log-level</key>
<value>info</value>
</entry>
<entry>
<key>log-prefix</key>
<value>### RESULT ###</value>
</entry>
<entry>
<key>log-message</key>
<value>${status:replaceEmpty('Success')}${msg}${msgCode}</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<executionNodeRestricted>false</executionNodeRestricted>
<name>LogMessage</name>
<relationships>
<autoTerminate>true</autoTerminate>
<name>success</name>
</relationships>
<state>RUNNING</state>
<style/>
<type>org.apache.nifi.processors.standard.LogMessage</type>
</processors>
</contents>
<name>BatchPreSQL</name>
<variables>
<entry>
<key>batchSize</key>
<value>4</value>
</entry>
</variables>
</processGroups>
</snippet>
<timestamp>01/15/2022 02:19:55 CST</timestamp>
</template>
... View more
01-14-2022
11:34 AM
Here is a customized way to implement such algorithm without writing any Groovy/Java/Python/etc. script. 1. Manually design a iteration loop to prepare the paged/batched sql str. 2. Inside such loop, we can focus on manipulating the pageNumber(loop index) to prepare each sql statement accordingly. For instance, we can use JOLT specification to group those ID into an IDList array via JoltTransformJSON processor, and is result would be like [123,124,125,...]. After that, we can design an iteration loop, to get predefined range of numbers from aforementioned IDList based on the batchSize/pageSize. For the 1st-Loop we can get 1st page filter scope, which covers the ID in
('IDList[0]', ..., 'IDList[pageSize*1-1]')
For the 2nd-Loop we can 2nd page filter scope, which covers the ID in
('IDList[pageSize*(2-1)]', ...., 'IDList[pageSize*2-1]')
As for the middle page filter scope, it would be
('IDList[pageSize*(pageNum-1)]',....'IDList[pageSize*pageNum-1]')
For the last page, it then comes to be
('IDList[pageSize*(pageNum-1)]',....'IDList[IDListLength-1]') In such way, we now get batched sql statement's in-conditional part to prepare the where clause. You may refer the following link to use its attached xml template file for more details. Hope this may help.
... View more