Member since
01-14-2022
14
Posts
6
Kudos Received
2
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
4903 | 02-14-2022 03:40 AM | |
2321 | 01-16-2022 09:48 PM |
02-14-2022
03:40 AM
1 Kudo
Thank you for your question. You may try using UpdateAttribute Processor's stateful value to deal with the incoming flow files in a batch mode. ============================ Here is the settings for UpdateAttribute ============================ Under the Advanced Mode of UpdateAttribute Processor: Set two rules as below: R0 -> initializeBatchIndex Conditions: ${getStateValue("fragment.index"):equals(-1):or(${getStateValue('fragment.index'):plus(1):ge(${batchSize})})} Actions (add fragment related attributes): fragment.count ${batchSize} fragment.identifier (For each batch, it should generate a new UUID as the identifier) ${UUID()} fragment.index ${getStateValue('fragment.index'):plus(1):mod(${batchSize})} R1 -> Iterations Conditions: ${getStateValue("fragment.index"):equals(-1):or(${getStateValue('fragment.index'):plus(1):ge(${batchSize})}):not()} Actions (add fragment related attributes): fragment.count(This parameter may be optional as it always be the same size around one specific batch test ) ${getStateValue('fragment.count')} fragment.identifier ${getStateValue('fragment.identifier')} fragment.index ${getStateValue('fragment.index'):plus(1):mod(${batchSize})} NOTE: Before that, we can set a Variables in your current Process Group( right click en empty area inside your process group, select variables, and add a variable named batchSize, with proper merged count you wanna set) The result of the merged flow files would be merged via the same fragment.identifier. Please let me know if this helps. Thanks & Regards, Oliver Gong
... View more
01-17-2022
06:46 AM
1 Kudo
Glad to hear that works for you! Though we can split the raw csv or json content into a smaller size just in case of OOM issue when doing the shifting stuff on JoltTransformJSON, it may cause other issue like: - result from JoltTransformJSON is not complete. That means, some part of the same user data ( with different bill_date) may be wrapped in the other flow files. In such case, we will need to merge them back as a whole. - It would be better if we can keep raw csv data in some DB table. Then drag out the data rows with a specified limit--> we can use "split pages" way to query on such table, - We can then easily fetch a logical completed data to do the rest shift things. With such completed result, we don't need to worry about the data is not an info-incomplete one.
... View more
01-17-2022
01:55 AM
Thank you for your question. As you mentioned, the output flow file you want get should be with two attributes. Here you can try following: 1. Use EvaluateJsonPath processor, add two properties and configure it using JSONPath like below: Details on JSONPath, you may refer to https://github.com/json-path/JsonPath 2. Or you may try to define some customized script to get the expected output dynamically. e.g Using ExecuteScript with the corresponding module lib settings to indicate path of referenced jars. - Script Engine: Groovy - Module Directory: System Path which jars reside (or without specifying this path, you can also configure the JVM level to load it, either you set the NiFi bootstrap or place the jars under the bootstrap extended lib directory.) import org.apache.commons.io.IOUtils
import java.nio.charset.StandardCharsets
import org.apache.nifi.logging.ComponentLog
import groovy.json.JsonSlurper
import com.fasterxml.jackson.databind.ObjectMapper
//Specify the module directory or configure the jvm classpath for current script to load and use such jars(jackson-annotations-2.9.0.jar,jackson-core-2.9.5.jar,jackson-databind-2.9.5.jar)
flowFile = session.get()
if(!flowFile) return
def inputStr=''
def jsonSlurper = new JsonSlurper()
try {
// Cast a closure with an inputStream parameter to InputStreamCallback
session.read(flowFile, {inputStream ->
inputStr=IOUtils.toString(inputStream, StandardCharsets.UTF_8)
} as InputStreamCallback)
def map = jsonSlurper.parseText(inputStr)
ObjectMapper mapper = new ObjectMapper()
for (entry in map) {
log.info( "JSONKey: ${entry.key} = JSONValue: ${entry.value}")
flowFile = session.putAttribute(flowFile, "${entry.key}", "${mapper.writeValueAsString(entry.value)}")
}
def outputStr=""
flowFile = session.write(flowFile, {outputStream ->
outputStream.write(outputStr.toString().getBytes('utf-8'))
} as OutputStreamCallback)
session.transfer(flowFile, REL_SUCCESS)
session.commit()
} catch(e) {
flowFile = session.putAttribute(flowFile, 'errorMsg', "${e.toString()}")
flowFile = session.write(flowFile, {outputStream ->
outputStream.write(e.toString().getBytes('utf-8'))
} as OutputStreamCallback)
session.transfer(flowFile, REL_FAILURE)
session.commit()
} ============ Additional Infos ============ You may go and visit below links: 1. For JSONPath -> https://github.com/json-path/JsonPath 2. For Apache NiFi scripts -> https://community.cloudera.com/t5/Community-Articles/ExecuteScript-Cookbook-part-1/ta-p/248922 3. For jackson-databind -> https://github.com/FasterXML/jackson-databind Hope this helps. Just feel free to contact me if you have any questions. Thanks, Oliver Gong
... View more
01-16-2022
11:13 PM
2 Kudos
Attached the corresponding screenshots to cover the detail on implementation. 0. The whole data flow is like: 1.GenerateFlowFile: 2. ConvertRecord: 2.1 CSV Reader(LG CSVReader): 2.2 JsonRecordSetWriter (LG JsonRecordSetWriter): 3. JoltTransformJSON 3.1 JoltTransformJSON - Advanced Mode: Should you have any questions, just feel free to contact me. Thanks, Oliver Gong
... View more
01-16-2022
09:48 PM
1 Kudo
Thank you for your question. We can try following steps to make it work as expected. 1. Convert csv to json ( you may use ConvertRecord ), to get JSON as below [{
"mobile": "9876543210",
"username": "John",
"bill_id": "AB10",
"bill_date": "2020-12-23",
"product_id": "X34",
"product_price": "500"
}, {
"mobile": "9876543210",
"username": "John",
"bill_id": "AB10",
"bill_date": "2020-12-23",
"product_id": "X35",
"product_price": "230"
}, {
"mobile": "9876543210",
"username": "John",
"bill_id": "AB22",
"bill_date": "2020-11-14",
"product_id": "X89",
"product_price": "700"
}, {
"mobile": "9999999999",
"username": "Alice",
"bill_id": "AC54",
"bill_date": "2019-12-20",
"product_id": "X10",
"product_price": "109"
}] 2. We will then use JoltTransformJSON to make the shift stuff, the specification of such JOLT is listed as: [ {
"operation" : "shift",
"spec" : {
"*" : {
"*" : "@(1,username)@(1,mobile)@(1,bill_id)@(1,bill_date)@(1,product_id).&"
}
}
}, {
"operation" : "shift",
"spec" : {
"*" : {
"$" : "users.[#2].username",
"*" : {
"$" : "users.[#3].mobile",
"*" : {
"*" : {
"$" : "users.[#5].bills.[#3].bill_date",
"$1" : "users.[#5].bills.[#3].bill_id",
"*" : {
"product_*" : "users.[#6].bills.[#4].products.[#2].&"
}
}
}
}
}
}
} ] NOTE: - For the JOLT spec, we gonna group the raw data into organic fields, which means we will get the data which are grouped by "username + mobile + bill_id +bill_date + product_id " fields. - After that, we get the phase-1 shifted data that are ordered by unique keys. - For the phase-2 shift, we gonna list out the data as predefined logic which is inferred from your attached sample output. 3. For details on JOLT spec, you may refer to the following link: https://jolt-demo.appspot.com/ Hope this helps. Thanks Oliver Gong
... View more
01-14-2022
02:40 PM
1 Kudo
Not sure if this issue persist. Attached is one way, using only one-phase shift operation: 1. date and env are flow file attributes which should be predefined when debugging under the JOLTTransformJSON Advanced Mode. 2. When dealing with JOLT Spec, we can predefine some constant which starts with "#" for the unexist value scope "#<constant>", in that way we can then handle the rest match-and-shift work . [ {
"operation" : "shift",
"spec" : {
"logTypeCode" : {
"FIN" : {
"#END_OF_TESTING" : {
"$":"body.&3",
"#END_OF_TESTING":"customValue.logTypeCode"
}
}
},
"*" : "body.&",
"#${date}" : "header.date",
"#${env}" : {
"$" : "customValue.ENV"
}
}
} ]
==============
OR
==============
[ {
"operation" : "shift",
"spec" : {
"logTypeCode" : {
"FIN" : {
"#END_OF_TESTING" : {
"$":"body.logTypeCode",
"#END_OF_TESTING":"customValue.logTypeCode"
}
}
},
"*" : "body.&",
"#${date}" : "header.date",
"#${env}" : {
"$" : "customValue.ENV"
}
}
} ] Hope this helps. Thanks
... View more
01-14-2022
01:57 PM
For other ways to handle this, you can also give a try on below way: 1. Add a Router( RouteOnAttribute, before that, you may need to extract the Life Object from the incoming raw json to flow file attribute), then dispatch the two different types into separate biz flows to deal with the transformation. 2. Another trial could be script coding, you may also write personal script code to implement this.
... View more
01-14-2022
01:07 PM
We can use the chain mode with below JOLT specification: 1. Formalize the two different JSON input type into the same JSON format during phase 1. During the phase-1 shift:
[{
"operation": "shift",
"spec": {
"*": "&", --Comments: this is use too keep Name and Status elements as their original positions.
"Life": {
"Sport": {
"@1": "Life.[]" --Comments: this is used for the Single Object (Life), which will then be turned into array via this spec settings.
},
"0": {
"@1": {
"*": "&3.[&]"--Comments: if Life is comming as an array type, the "0" would be matched expectively, then we do the rest shift stuff, to keep the Life Object as array type orignally.
}
}
}
}
} NOTE: "--Comments:... " in the above code is only intended for personal comments, please do not treat it as the valid way of JOLT SPEC denotation, as it won't pass the SPEC Validation if those chars are include during validation phase of JOLT SPEC. 2. The second shift during phase-2 of the chain will take over the rest transformation, flatting the array type Life's sub elements and group each one of them as array including single object ( which is not array type after phase-1 shifting, to verify this, you can manually remove the '"[]" of "@": "&1[]" on the phase-2 shift ). ========================================= The whole content of JOLT Spec is listed as below: ========================================= [{
"operation": "shift",
"spec": {
"*": "&",
"Life": {
"Sport": {
"@1": "Life.[]"
},
"0": {
"@1": {
"*": "&3.[&]"
}
}
}
}
}, {
"operation": "shift",
"spec": {
"Life": {
"*": {
"*": {
"@": "&1[]"
}
}
},
"*": "&"
}
}] Hope this helps. Thanks
... View more
01-14-2022
11:50 AM
Perhaps we could first read the flow file content in json format, and then parsing it as json object. ( e.g. if using groovy script, we might use JsonSlurper to deal with such flow file content of valid JSON format. ) Then the rest of the thing could be dealing with the iteration loop for its ID array, and following the same logic to distinguish the different pages.
... View more
01-14-2022
11:37 AM
Attached the xml template file for your reference: <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<template encoding-version="1.2">
<description>BatchPreSQL
Author: Oliver Gong
DATE: 2022-01-15
</description>
<groupId>0a50e507-017e-1000-2e5c-c691a8dbb45b</groupId>
<name>BatchPreSQL</name>
<snippet>
<processGroups>
<id>eff4ec5e-a8ab-3310-0000-000000000000</id>
<parentGroupId>5511af2f-6c32-35cc-0000-000000000000</parentGroupId>
<position>
<x>0.0</x>
<y>0.0</y>
</position>
<additions/>
<comments>BatchPreSQL
Author: Oliver Gong
DATE: 2022-01-14
</comments>
<contents>
<connections>
<id>15c43a7e-4e9a-39c0-0000-000000000000</id>
<parentGroupId>eff4ec5e-a8ab-3310-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>8901cdca-8fdd-3d01-0000-000000000000</id>
<type>INPUT_PORT</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>eff4ec5e-a8ab-3310-0000-000000000000</groupId>
<id>30fdeb4d-c403-3862-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>97ca5515-e845-3e39-0000-000000000000</id>
<parentGroupId>eff4ec5e-a8ab-3310-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<bends>
<x>643.1160941528465</x>
<y>1122.8689543715004</y>
</bends>
<destination>
<groupId>eff4ec5e-a8ab-3310-0000-000000000000</groupId>
<id>e333c1a6-2f02-341e-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<source>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>6ff09bae-c4a1-38c5-0000-000000000000</id>
<type>OUTPUT_PORT</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>9874061c-5dd6-3fd9-0000-000000000000</id>
<parentGroupId>eff4ec5e-a8ab-3310-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>eff4ec5e-a8ab-3310-0000-000000000000</groupId>
<id>74dbf5d9-50b9-3055-0000-000000000000</id>
<type>FUNNEL</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<source>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>b8c0272c-e161-3456-0000-000000000000</id>
<type>OUTPUT_PORT</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>e271b74a-2af3-3fcf-0000-000000000000</id>
<parentGroupId>eff4ec5e-a8ab-3310-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>eff4ec5e-a8ab-3310-0000-000000000000</groupId>
<id>e333c1a6-2f02-341e-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<source>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>3e3e7f3b-3d3a-3b8d-0000-000000000000</id>
<type>OUTPUT_PORT</type>
</source>
<zIndex>0</zIndex>
</connections>
<funnels>
<id>74dbf5d9-50b9-3055-0000-000000000000</id>
<parentGroupId>eff4ec5e-a8ab-3310-0000-000000000000</parentGroupId>
<position>
<x>386.6160941528465</x>
<y>1410.3689543715004</y>
</position>
</funnels>
<labels>
<id>087856d3-c058-355e-0000-000000000000</id>
<parentGroupId>eff4ec5e-a8ab-3310-0000-000000000000</parentGroupId>
<position>
<x>94.61609415284647</x>
<y>782.3689543715004</y>
</position>
<height>255.0</height>
<label>1. Generate a request body to mock the client’s json request as following:
[{
"ID":123,
"params":"xxx"
},{
"ID":124,
"params":"xxx"
},{
"ID":125,
"params":"xxx"
},{
"ID":126,
"params":"xxx"
},{
"ID":127,
"params":"xxx"
}]</label>
<style>
<entry>
<key>font-size</key>
<value>12px</value>
</entry>
</style>
<width>528.0</width>
</labels>
<labels>
<id>4a9b240d-8ba7-3852-0000-000000000000</id>
<parentGroupId>eff4ec5e-a8ab-3310-0000-000000000000</parentGroupId>
<position>
<x>-30.383905847153528</x>
<y>1472.3689543715004</y>
</position>
<height>439.0</height>
<label>Description on each condition scope of the spawn SQLs
==============================================
The index of data from IDList arrays is listed like
1stPage: [FROM 0 TO pageSize-1]
2ndPage: [FROM pageSize TO 2*pageSize-1]
...
MiddlePage: [FROM (currentPage-1)*pageSize TO pageNumber*pageSize-1]
...
LastPage: [FROM (currentPage-1)*pageSiz TO IDListLength-1 ]
As per the JSONPath grammar, to slice the array into a sub array,
the specification we need to know is that :
when using [I_START:I_END] JSONPath to slice the array,
-> the data we gonna get is FROM index I_START TO I_END -1
which means the index we specified after ":" would not be included to the slicing result. </label>
<style>
<entry>
<key>font-size</key>
<value>24px</value>
</entry>
</style>
<width>1040.0</width>
</labels>
<labels>
<id>9e75a5d0-b823-3548-0000-000000000000</id>
<parentGroupId>eff4ec5e-a8ab-3310-0000-000000000000</parentGroupId>
<position>
<x>-414.3839058471535</x>
<y>-6.6310914048667655</y>
</position>
<height>732.0</height>
<label>This module(Processor Group) is going to demonstrate the way on
how to prepare a batched SQLs when trying to look up a range of data in DB.
Once you finish importing this module, here is the guide on
+++++++++++++++++++++++
HOW TO USE THIS MODULE:
1. Start the whole module's processors, check the content/attribute of the flowfile coming from the sub module(01 PrepareBatchedSQLStr)
2. Check the batched sqlStr.
+++++++++++++++++++++++
You can try below stuff, if you'd like to do some further validation on this algorithm:
1. CHANGE the module variable named batchSize.
With different batchSize settings, you would see the sqlStr being prepared accordingly.
(
A. Right click on empty area on the canvas of current Module--BatchPreSQL,
B. Then select and click "Variables" button to jump into Variables setting page.
C. Set variable-batchSize with other int values(postive int number exclude 0) to configure the pageSize of current algorithm.
1. batchSize > IDListLength
2. batchSize = IDListLength
3. batchSize < IDListLength (postive int number exclude 0 or you may use abs EL function to support the negative values)
D. Run the whole biz flow to verify such result.
)
2. You may also apply with larger raw json array to run the biz flow.
Please feel free to let me know if you have any question.</label>
<style>
<entry>
<key>background-color</key>
<value>#53b3d9</value>
</entry>
<entry>
<key>font-size</key>
<value>24px</value>
</entry>
</style>
<width>1514.0</width>
</labels>
<processGroups>
<id>56e121fa-15d6-3fd4-0000-000000000000</id>
<parentGroupId>eff4ec5e-a8ab-3310-0000-000000000000</parentGroupId>
<position>
<x>217.61609415284647</x>
<y>1104.3689543715004</y>
</position>
<additions/>
<comments></comments>
<contents>
<connections>
<id>07a8f12b-7736-3a6c-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>9ef7cadf-476e-3437-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>failure</selectedRelationships>
<selectedRelationships>unmatched</selectedRelationships>
<source>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>4ee2df31-2f9b-34d5-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>13b22f01-b24d-359f-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>60fb5121-b8bc-394a-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>19e51c58-e665-33c7-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>1736190e-f132-3dde-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>2428e910-0762-38e2-0000-000000000000</id>
<type>FUNNEL</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name>Finish Batch Loop</name>
<selectedRelationships>unmatched</selectedRelationships>
<source>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>97407168-ea79-32f1-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>37a04d0a-9f84-3813-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>ed04498f-82f7-37f7-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>1. Inside the Batch Loop</selectedRelationships>
<source>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>97407168-ea79-32f1-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>4a4cc1bc-c93c-37f3-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>b8c0272c-e161-3456-0000-000000000000</id>
<type>OUTPUT_PORT</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>failure</selectedRelationships>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>60fb5121-b8bc-394a-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>4eef65de-4c95-3ca3-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>6ff09bae-c4a1-38c5-0000-000000000000</id>
<type>OUTPUT_PORT</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>6837ac99-5ce2-3c47-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>5cd2322c-6cf8-3117-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>97407168-ea79-32f1-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>f784d98d-4eaf-395c-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>660fbef2-894b-3ba6-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>e4d04817-e7d4-34a9-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>ed04498f-82f7-37f7-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>6a063a4c-e0e0-3f3f-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>3e3e7f3b-3d3a-3b8d-0000-000000000000</id>
<type>OUTPUT_PORT</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<source>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>2428e910-0762-38e2-0000-000000000000</id>
<type>FUNNEL</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>73d4d5b3-ed4d-33ef-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>f784d98d-4eaf-395c-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>matched</selectedRelationships>
<source>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>4ee2df31-2f9b-34d5-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>84fbfa68-f4f4-3ff8-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>a37238fd-6b1a-334b-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<source>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>8901cdca-8fdd-3d01-0000-000000000000</id>
<type>INPUT_PORT</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>955217f3-8048-314d-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>4ee2df31-2f9b-34d5-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>a37238fd-6b1a-334b-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>b447a208-a363-3f24-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>19e51c58-e665-33c7-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>ed04498f-82f7-37f7-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>b5babaa5-ec72-3496-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>6837ac99-5ce2-3c47-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>failure</selectedRelationships>
<source>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>a37238fd-6b1a-334b-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>b97777ac-b7ee-30e2-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>97407168-ea79-32f1-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>e4d04817-e7d4-34a9-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>fd991255-d565-30f1-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>6ff09bae-c4a1-38c5-0000-000000000000</id>
<type>OUTPUT_PORT</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>56e121fa-15d6-3fd4-0000-000000000000</groupId>
<id>9ef7cadf-476e-3437-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<funnels>
<id>2428e910-0762-38e2-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<position>
<x>1177.000005285659</x>
<y>694.9999968519692</y>
</position>
</funnels>
<inputPorts>
<id>8901cdca-8fdd-3d01-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<position>
<x>486.000005285659</x>
<y>46.99999685196917</y>
</position>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<name>IN</name>
<state>RUNNING</state>
<type>INPUT_PORT</type>
</inputPorts>
<labels>
<id>29a66daf-8ba8-387c-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<position>
<x>1070.6160941528465</x>
<y>945.3690154066567</y>
</position>
<height>124.0</height>
<label>JSONPath
[start:end]
Selects array elements from the start index and up to,
but not including, end index.
[start:]
If end is omitted,
selects all elements from start until the end of the array. Returns a list.</label>
<style>
<entry>
<key>font-size</key>
<value>12px</value>
</entry>
</style>
<width>393.0</width>
</labels>
<labels>
<id>69a045b5-f89a-3bdd-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<position>
<x>707.000005285659</x>
<y>243.99999685196917</y>
</position>
<height>47.0</height>
<label>Here we are going to extract the IDList which contains the whole ID(s) from the raw data.</label>
<style>
<entry>
<key>font-size</key>
<value>18px</value>
</entry>
</style>
<width>732.0</width>
</labels>
<outputPorts>
<id>3e3e7f3b-3d3a-3b8d-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<position>
<x>1418.7386527465965</x>
<y>683.4886290541176</y>
</position>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<name>Finish Preparing Batch SQL strs</name>
<state>RUNNING</state>
<type>OUTPUT_PORT</type>
</outputPorts>
<outputPorts>
<id>6ff09bae-c4a1-38c5-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<position>
<x>-250.38390584715353</x>
<y>340.3689543715004</y>
</position>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<name>Exceptions</name>
<state>RUNNING</state>
<type>OUTPUT_PORT</type>
</outputPorts>
<outputPorts>
<id>b8c0272c-e161-3456-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<position>
<x>835.7386527465965</x>
<y>1555.4886290541176</y>
</position>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<name>Go2RemainBiz</name>
<state>RUNNING</state>
<type>OUTPUT_PORT</type>
</outputPorts>
<processors>
<id>19e51c58-e665-33c7-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<position>
<x>883.6160941528465</x>
<y>1151.3689543715004</y>
</position>
<bundle>
<artifact>nifi-update-attribute-nar</artifact>
<group>org.apache.nifi</group>
<version>1.7.1-2.0_09000</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments>If there only 1 element in the fetching scope, we would surround it with ('<LastPage_OnlyOneExpectedValue2Fetch>')</comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Delete Attributes Expression</key>
<value>
<name>Delete Attributes Expression</name>
</value>
</entry>
<entry>
<key>Store State</key>
<value>
<name>Store State</name>
</value>
</entry>
<entry>
<key>Stateful Variables Initial Value</key>
<value>
<name>Stateful Variables Initial Value</name>
</value>
</entry>
<entry>
<key>sqlStr</key>
<value>
<name>sqlStr</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Delete Attributes Expression</key>
</entry>
<entry>
<key>Store State</key>
<value>Do not store state</value>
</entry>
<entry>
<key>Stateful Variables Initial Value</key>
</entry>
<entry>
<key>sqlStr</key>
<value>SELECT
*
FROM
<TABLE_NAME>
WHERE
ID IN ${sqlStrBatchCondition:contains(','):not()
:ifElse(
"('${sqlStrBatchCondition}')",
${sqlStrBatchCondition}
)
}</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<executionNodeRestricted>false</executionNodeRestricted>
<name>prepareSQLStrs</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<state>RUNNING</state>
<style/>
<type>org.apache.nifi.processors.attributes.UpdateAttribute</type>
</processors>
<processors>
<id>4ee2df31-2f9b-34d5-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<position>
<x>564.7386527465965</x>
<y>439.4885985365395</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.7.1-2.0_09000</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Destination</key>
<value>
<name>Destination</name>
</value>
</entry>
<entry>
<key>Return Type</key>
<value>
<name>Return Type</name>
</value>
</entry>
<entry>
<key>Path Not Found Behavior</key>
<value>
<name>Path Not Found Behavior</name>
</value>
</entry>
<entry>
<key>Null Value Representation</key>
<value>
<name>Null Value Representation</name>
</value>
</entry>
<entry>
<key>IDList</key>
<value>
<name>IDList</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Destination</key>
<value>flowfile-attribute</value>
</entry>
<entry>
<key>Return Type</key>
<value>json</value>
</entry>
<entry>
<key>Path Not Found Behavior</key>
<value>ignore</value>
</entry>
<entry>
<key>Null Value Representation</key>
<value>empty string</value>
</entry>
<entry>
<key>IDList</key>
<value>$.IDList</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<executionNodeRestricted>false</executionNodeRestricted>
<name>EvaluateJsonPath</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>matched</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>unmatched</name>
</relationships>
<state>RUNNING</state>
<style/>
<type>org.apache.nifi.processors.standard.EvaluateJsonPath</type>
</processors>
<processors>
<id>60fb5121-b8bc-394a-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<position>
<x>893.7386527465965</x>
<y>1365.4886290541176</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.7.1-2.0_11005</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Regular Expression</key>
<value>
<name>Regular Expression</name>
</value>
</entry>
<entry>
<key>Replacement Value</key>
<value>
<name>Replacement Value</name>
</value>
</entry>
<entry>
<key>Character Set</key>
<value>
<name>Character Set</name>
</value>
</entry>
<entry>
<key>Maximum Buffer Size</key>
<value>
<name>Maximum Buffer Size</name>
</value>
</entry>
<entry>
<key>Replacement Strategy</key>
<value>
<name>Replacement Strategy</name>
</value>
</entry>
<entry>
<key>Evaluation Mode</key>
<value>
<name>Evaluation Mode</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Regular Expression</key>
<value>(?s)(^.*$)</value>
</entry>
<entry>
<key>Replacement Value</key>
<value>==========
TotalPage: ${totalPage}
CurrentPage: ${currentPage}
PageSize:${batchSize}
==========
SQL is:
${sqlStr}</value>
</entry>
<entry>
<key>Character Set</key>
<value>UTF-8</value>
</entry>
<entry>
<key>Maximum Buffer Size</key>
<value>1 MB</value>
</entry>
<entry>
<key>Replacement Strategy</key>
<value>Always Replace</value>
</entry>
<entry>
<key>Evaluation Mode</key>
<value>Entire text</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<executionNodeRestricted>false</executionNodeRestricted>
<name>PrintResult</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<state>RUNNING</state>
<style/>
<type>org.apache.nifi.processors.standard.ReplaceText</type>
</processors>
<processors>
<id>6837ac99-5ce2-3c47-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<position>
<x>245.00000528565897</x>
<y>227.99999685196917</y>
</position>
<bundle>
<artifact>nifi-update-attribute-nar</artifact>
<group>org.apache.nifi</group>
<version>1.7.1-2.0_09000</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Delete Attributes Expression</key>
<value>
<name>Delete Attributes Expression</name>
</value>
</entry>
<entry>
<key>Store State</key>
<value>
<name>Store State</name>
</value>
</entry>
<entry>
<key>Stateful Variables Initial Value</key>
<value>
<name>Stateful Variables Initial Value</name>
</value>
</entry>
<entry>
<key>msg</key>
<value>
<name>msg</name>
</value>
</entry>
<entry>
<key>msgCode</key>
<value>
<name>msgCode</name>
</value>
</entry>
<entry>
<key>status</key>
<value>
<name>status</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Delete Attributes Expression</key>
</entry>
<entry>
<key>Store State</key>
<value>Do not store state</value>
</entry>
<entry>
<key>Stateful Variables Initial Value</key>
</entry>
<entry>
<key>msg</key>
<value>Failed to extract IDList from raw data via JOLT.</value>
</entry>
<entry>
<key>msgCode</key>
<value>01:01</value>
</entry>
<entry>
<key>status</key>
<value>failed</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<executionNodeRestricted>false</executionNodeRestricted>
<name>01 PrepareExceptionInfo</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<state>RUNNING</state>
<style/>
<type>org.apache.nifi.processors.attributes.UpdateAttribute</type>
</processors>
<processors>
<id>97407168-ea79-32f1-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<position>
<x>868.7386527465965</x>
<y>670.4886290541176</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.7.1-2.0_09000</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Routing Strategy</key>
<value>
<name>Routing Strategy</name>
</value>
</entry>
<entry>
<key>1. Inside the Batch Loop</key>
<value>
<name>1. Inside the Batch Loop</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Routing Strategy</key>
<value>Route to Property name</value>
</entry>
<entry>
<key>1. Inside the Batch Loop</key>
<value>${currentPage:le(${totalPage})}</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<executionNodeRestricted>false</executionNodeRestricted>
<name>RouteOnAttribute</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>1. Inside the Batch Loop</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>unmatched</name>
</relationships>
<state>RUNNING</state>
<style/>
<type>org.apache.nifi.processors.standard.RouteOnAttribute</type>
</processors>
<processors>
<id>9ef7cadf-476e-3437-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<position>
<x>247.00000528565897</x>
<y>439.9999968519692</y>
</position>
<bundle>
<artifact>nifi-update-attribute-nar</artifact>
<group>org.apache.nifi</group>
<version>1.7.1-2.0_09000</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Delete Attributes Expression</key>
<value>
<name>Delete Attributes Expression</name>
</value>
</entry>
<entry>
<key>Store State</key>
<value>
<name>Store State</name>
</value>
</entry>
<entry>
<key>Stateful Variables Initial Value</key>
<value>
<name>Stateful Variables Initial Value</name>
</value>
</entry>
<entry>
<key>msg</key>
<value>
<name>msg</name>
</value>
</entry>
<entry>
<key>msgCode</key>
<value>
<name>msgCode</name>
</value>
</entry>
<entry>
<key>status</key>
<value>
<name>status</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Delete Attributes Expression</key>
</entry>
<entry>
<key>Store State</key>
<value>Do not store state</value>
</entry>
<entry>
<key>Stateful Variables Initial Value</key>
</entry>
<entry>
<key>msg</key>
<value>Failed to extract IDList from flowfile content to attribute.</value>
</entry>
<entry>
<key>msgCode</key>
<value>01:02</value>
</entry>
<entry>
<key>status</key>
<value>failed</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<executionNodeRestricted>false</executionNodeRestricted>
<name>02 PrepareExceptionInfo</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<state>RUNNING</state>
<style/>
<type>org.apache.nifi.processors.attributes.UpdateAttribute</type>
</processors>
<processors>
<id>a37238fd-6b1a-334b-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<position>
<x>558.6317191528465</x>
<y>227.36889333634417</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.7.1-2.0_11005</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments>Here we are going to extract the IDList which contains the whole ID(s) from the raw data.</comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>jolt-transform</key>
<value>
<name>jolt-transform</name>
</value>
</entry>
<entry>
<key>jolt-custom-class</key>
<value>
<name>jolt-custom-class</name>
</value>
</entry>
<entry>
<key>jolt-custom-modules</key>
<value>
<name>jolt-custom-modules</name>
</value>
</entry>
<entry>
<key>jolt-spec</key>
<value>
<name>jolt-spec</name>
</value>
</entry>
<entry>
<key>Transform Cache Size</key>
<value>
<name>Transform Cache Size</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>jolt-transform</key>
<value>jolt-transform-chain</value>
</entry>
<entry>
<key>jolt-custom-class</key>
</entry>
<entry>
<key>jolt-custom-modules</key>
</entry>
<entry>
<key>jolt-spec</key>
<value>[{
"operation": "shift",
"spec": {
"*":{
"ID":"IDList"
}
}
}
]</value>
</entry>
<entry>
<key>Transform Cache Size</key>
<value>1</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<executionNodeRestricted>false</executionNodeRestricted>
<name>JoltTransformJSON</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<state>RUNNING</state>
<style/>
<type>org.apache.nifi.processors.standard.JoltTransformJSON</type>
</processors>
<processors>
<id>e4d04817-e7d4-34a9-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<position>
<x>577.7386527465965</x>
<y>946.4886290541176</y>
</position>
<bundle>
<artifact>nifi-update-attribute-nar</artifact>
<group>org.apache.nifi</group>
<version>1.7.1-2.0_09000</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Delete Attributes Expression</key>
<value>
<name>Delete Attributes Expression</name>
</value>
</entry>
<entry>
<key>Store State</key>
<value>
<name>Store State</name>
</value>
</entry>
<entry>
<key>Stateful Variables Initial Value</key>
<value>
<name>Stateful Variables Initial Value</name>
</value>
</entry>
<entry>
<key>currentPage</key>
<value>
<name>currentPage</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Delete Attributes Expression</key>
</entry>
<entry>
<key>Store State</key>
<value>Do not store state</value>
</entry>
<entry>
<key>Stateful Variables Initial Value</key>
</entry>
<entry>
<key>currentPage</key>
<value>${currentPage:plus(1)}</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<executionNodeRestricted>false</executionNodeRestricted>
<name>ScrollcurrentPage</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<state>RUNNING</state>
<style/>
<type>org.apache.nifi.processors.attributes.UpdateAttribute</type>
</processors>
<processors>
<id>ed04498f-82f7-37f7-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<position>
<x>874.7386527465965</x>
<y>947.4886290541176</y>
</position>
<bundle>
<artifact>nifi-update-attribute-nar</artifact>
<group>org.apache.nifi</group>
<version>1.7.1-2.0_09000</version>
</bundle>
<config>
<annotationData><criteria>
<flowFilePolicy>USE_ORIGINAL</flowFilePolicy>
<rules>
<actions>
<attribute>sqlStrBatchCondition</attribute>
<id>570f5b7b-f835-4c8d-8f5d-e08deda1c7c1</id>
<value>${IDList:jsonPath("$[0:${IDListLength}]")
:replaceAll(',',"','"):replaceAll('\[',"('"):replaceAll('\]',"')")
}
</value>
</actions>
<conditions>
<expression>${currentPage:equals(1)}</expression>
<id>97d055e9-12d2-439c-93a4-8642f0bebd6d</id>
</conditions>
<conditions>
<expression>${IDListLength:le(${pageSize})}</expression>
<id>fa0053b2-a183-4f33-9788-3fe68ccae5f1</id>
</conditions>
<id>c0bb8d5c-cb73-40f5-8323-632122cb75ac</id>
<name>1.1 First Page &amp;&amp; IDListLength &lt;= pageSize </name>
</rules>
<rules>
<actions>
<attribute>sqlStrBatchCondition</attribute>
<id>570f5b7b-f835-4c8d-8f5d-e08deda1c7c1</id>
<value>${IDList:jsonPath("$[0:${pageSize}]")
:replaceAll(',',"','"):replaceAll('\[',"('"):replaceAll('\]',"')")
}</value>
</actions>
<conditions>
<expression>${currentPage:equals(1)}</expression>
<id>97d055e9-12d2-439c-93a4-8642f0bebd6d</id>
</conditions>
<conditions>
<expression>${IDListLength:le(${pageSize}):not()}</expression>
<id>fa0053b2-a183-4f33-9788-3fe68ccae5f1</id>
</conditions>
<id>b351555e-2dd8-4303-bd05-6aff43731a38</id>
<name>1.2 First Page &amp;&amp; IDListLength &gt; pageSize </name>
</rules>
<rules>
<actions>
<attribute>sqlStrBatchCondition</attribute>
<id>570f5b7b-f835-4c8d-8f5d-e08deda1c7c1</id>
<value>${IDList:jsonPath("$[${currentPage:minus(1):multiply(${pageSize})}:${currentPage:minus(1):multiply(${pageSize}):plus(${pageSize})}]")
:replaceAll(',',"','"):replaceAll('\[',"('"):replaceAll('\]',"')")
}</value>
</actions>
<conditions>
<expression>${currentPage:lt(${totalPage})}</expression>
<id>46a93c40-dac1-4502-8dac-ceb2b12a2756</id>
</conditions>
<conditions>
<expression>${currentPage:gt(1)}</expression>
<id>97d055e9-12d2-439c-93a4-8642f0bebd6d</id>
</conditions>
<id>e19f2512-98be-47fa-8e0b-76339348f853</id>
<name>2. MiddlePage</name>
</rules>
<rules>
<actions>
<attribute>sqlStrBatchCondition</attribute>
<id>570f5b7b-f835-4c8d-8f5d-e08deda1c7c1</id>
<value>${IDList:jsonPath("$[${currentPage:minus(1):multiply(${pageSize})}:${IDListLength}]")
:replaceAll(',',"','"):replaceAll('\[',"('"):replaceAll('\]',"')")
}</value>
</actions>
<conditions>
<expression>${currentPage:equals(${totalPage})}</expression>
<id>97d055e9-12d2-439c-93a4-8642f0bebd6d</id>
</conditions>
<conditions>
<expression>${totalPage:equals(1):not()}</expression>
<id>46a93c40-dac1-4502-8dac-ceb2b12a2756</id>
</conditions>
<id>0d277b81-3121-4d87-844d-4dcc1325fb2c</id>
<name>3. LastPage &amp;&amp; totalPage !=1</name>
</rules>
</criteria></annotationData>
<bulletinLevel>WARN</bulletinLevel>
<comments>SELECT
*
FROM
<TABLE_NAME>
WHERE
ID IN ${index:equals(${specificFlag:replaceEmpty('')})
:ifElse(
${IDList:jsonPath("$[${index:multiply(${batchSize})}:${IDListLength:minus(1)}]")
:replaceAll(',',"','"):replaceAll('\[',"('"):replaceAll('\]',"')")
},
${IDList:jsonPath("$[${index:multiply(${batchSize})}:${index:multiply(${batchSize}):plus(${batchSize})}]")
:replaceAll(',',"','"):replaceAll('\[',"('"):replaceAll('\]',"')")
}
)
}</comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Delete Attributes Expression</key>
<value>
<name>Delete Attributes Expression</name>
</value>
</entry>
<entry>
<key>Store State</key>
<value>
<name>Store State</name>
</value>
</entry>
<entry>
<key>Stateful Variables Initial Value</key>
<value>
<name>Stateful Variables Initial Value</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>0 sec</penaltyDuration>
<properties>
<entry>
<key>Delete Attributes Expression</key>
</entry>
<entry>
<key>Store State</key>
<value>Do not store state</value>
</entry>
<entry>
<key>Stateful Variables Initial Value</key>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<executionNodeRestricted>false</executionNodeRestricted>
<name>prepareWhereClauseConditionPart</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<state>RUNNING</state>
<style/>
<type>org.apache.nifi.processors.attributes.UpdateAttribute</type>
</processors>
<processors>
<id>f784d98d-4eaf-395c-0000-000000000000</id>
<parentGroupId>56e121fa-15d6-3fd4-0000-000000000000</parentGroupId>
<position>
<x>566.7386527465965</x>
<y>666.4886290541176</y>
</position>
<bundle>
<artifact>nifi-update-attribute-nar</artifact>
<group>org.apache.nifi</group>
<version>1.7.1-2.0_09000</version>
</bundle>
<config>
<annotationData><criteria>
<flowFilePolicy>USE_ORIGINAL</flowFilePolicy>
<rules>
<actions>
<attribute>totalPage</attribute>
<id>0c63c6dd-1748-4e21-8e35-e8645341dd48</id>
<value>${IDList:jsonPath('$.length()'):divide(${batchSize})}</value>
</actions>
<conditions>
<expression>${IDList:jsonPath('$.length()'):mod(${batchSize}):equals('0')}</expression>
<id>20c60e9a-db68-44b1-a22b-e3f83a57dd8a</id>
</conditions>
<id>2fb2c755-2084-409d-b48e-176751d78930</id>
<name>1. IDList Length Mod batchSize = 0</name>
</rules>
<rules>
<actions>
<attribute>specialPage</attribute>
<id>4f763b71-bb46-472c-8364-86e6fbd293fe</id>
<value>${IDList:jsonPath('$.length()'):divide(${batchSize})}</value>
</actions>
<actions>
<attribute>totalPage</attribute>
<id>0c63c6dd-1748-4e21-8e35-e8645341dd48</id>
<value>${IDList:jsonPath('$.length()'):divide(${batchSize}):plus(1)}</value>
</actions>
<conditions>
<expression>${IDList:jsonPath('$.length()'):mod(${batchSize}):equals('0'):not()}</expression>
<id>20c60e9a-db68-44b1-a22b-e3f83a57dd8a</id>
</conditions>
<id>403feae6-e423-4b8b-b15a-e8a93dac5fd7</id>
<name>2. 1. IDList Length Mod batchSize != 0</name>
</rules>
</criteria></annotationData>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Delete Attributes Expression</key>
<value>
<name>Delete Attributes Expression</name>
</value>
</entry>
<entry>
<key>Store State</key>
<value>
<name>Store State</name>
</value>
</entry>
<entry>
<key>Stateful Variables Initial Value</key>
<value>
<name>Stateful Variables Initial Value</name>
</value>
</entry>
<entry>
<key>currentPage</key>
<value>
<name>currentPage</name>
</value>
</entry>
<entry>
<key>IDListLength</key>
<value>
<name>IDListLength</name>
</value>
</entry>
<entry>
<key>pageSize</key>
<value>
<name>pageSize</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Delete Attributes Expression</key>
</entry>
<entry>
<key>Store State</key>
<value>Do not store state</value>
</entry>
<entry>
<key>Stateful Variables Initial Value</key>
</entry>
<entry>
<key>currentPage</key>
<value>1</value>
</entry>
<entry>
<key>IDListLength</key>
<value>${IDList:jsonPath('$.length()')}</value>
</entry>
<entry>
<key>pageSize</key>
<value>${batchSize}</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<executionNodeRestricted>false</executionNodeRestricted>
<name>InitializeTheBatchLoop</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<state>RUNNING</state>
<style/>
<type>org.apache.nifi.processors.attributes.UpdateAttribute</type>
</processors>
</contents>
<name>01 PrepareBatchedSQLStr</name>
<variables/>
</processGroups>
<processors>
<id>30fdeb4d-c403-3862-0000-000000000000</id>
<parentGroupId>eff4ec5e-a8ab-3310-0000-000000000000</parentGroupId>
<position>
<x>351.6160941528465</x>
<y>863.3689543715004</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.7.1-2.0_09000</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>File Size</key>
<value>
<name>File Size</name>
</value>
</entry>
<entry>
<key>Batch Size</key>
<value>
<name>Batch Size</name>
</value>
</entry>
<entry>
<key>Data Format</key>
<value>
<name>Data Format</name>
</value>
</entry>
<entry>
<key>Unique FlowFiles</key>
<value>
<name>Unique FlowFiles</name>
</value>
</entry>
<entry>
<key>generate-ff-custom-text</key>
<value>
<name>generate-ff-custom-text</name>
</value>
</entry>
<entry>
<key>character-set</key>
<value>
<name>character-set</name>
</value>
</entry>
<entry>
<key>mime.type</key>
<value>
<name>mime.type</name>
</value>
</entry>
<entry>
<key>test</key>
<value>
<name>test</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>File Size</key>
<value>0B</value>
</entry>
<entry>
<key>Batch Size</key>
<value>1</value>
</entry>
<entry>
<key>Data Format</key>
<value>Text</value>
</entry>
<entry>
<key>Unique FlowFiles</key>
<value>false</value>
</entry>
<entry>
<key>generate-ff-custom-text</key>
<value>[{
"ID":123,
"params":"xxx"
},{
"ID":124,
"params":"xxx"
},{
"ID":125,
"params":"xxx"
},{
"ID":126,
"params":"xxx"
},{
"ID":127,
"params":"xxx"
}]</value>
</entry>
<entry>
<key>character-set</key>
<value>UTF-8</value>
</entry>
<entry>
<key>mime.type</key>
<value>application/json;charset=utf-8</value>
</entry>
<entry>
<key>test</key>
<value>${literal('ab')
:toUpper()}</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>1 d</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<executionNodeRestricted>false</executionNodeRestricted>
<name>GenerateFlowFile</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<state>RUNNING</state>
<style/>
<type>org.apache.nifi.processors.standard.GenerateFlowFile</type>
</processors>
<processors>
<id>e333c1a6-2f02-341e-0000-000000000000</id>
<parentGroupId>eff4ec5e-a8ab-3310-0000-000000000000</parentGroupId>
<position>
<x>824.6160941528465</x>
<y>1161.3689543715004</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.7.1-2.0_09000</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>log-level</key>
<value>
<name>log-level</name>
</value>
</entry>
<entry>
<key>log-prefix</key>
<value>
<name>log-prefix</name>
</value>
</entry>
<entry>
<key>log-message</key>
<value>
<name>log-message</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>log-level</key>
<value>info</value>
</entry>
<entry>
<key>log-prefix</key>
<value>### RESULT ###</value>
</entry>
<entry>
<key>log-message</key>
<value>${status:replaceEmpty('Success')}${msg}${msgCode}</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<executionNodeRestricted>false</executionNodeRestricted>
<name>LogMessage</name>
<relationships>
<autoTerminate>true</autoTerminate>
<name>success</name>
</relationships>
<state>RUNNING</state>
<style/>
<type>org.apache.nifi.processors.standard.LogMessage</type>
</processors>
</contents>
<name>BatchPreSQL</name>
<variables>
<entry>
<key>batchSize</key>
<value>4</value>
</entry>
</variables>
</processGroups>
</snippet>
<timestamp>01/15/2022 02:19:55 CST</timestamp>
</template>
... View more