- Subscribe to RSS Feed
- Mark Question as New
- Mark Question as Read
- Float this Question for Current User
- Bookmark
- Subscribe
- Mute
- Printer Friendly Page
Extracting state from GenerateTableFetch processor
- Labels:
-
Apache NiFi
Created 08-25-2022 06:31 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Hi,
We are migrating data from one db to another using NIFI. We are using GenerateTableFetch processor for the incremental fetch by setting a column in Max-Value columns. For logging purpose, i need to extract the state of the GenerateTableFetch processor and insert in a table. By state I mean the max value of the 'Max-Value column' that is captured as the state of the processor. Can someone please help with how can i extract the processor state?
Thank you.
Created 08-26-2022 07:42 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
@nk20 ,
You can use the NiFi API to extract the current state of any stateful processor if you need this.
See the example below for a PutFile processor:
access_token=$(curl -k \
-X POST \
-H 'Content-Type: application/x-www-form-urlencoded' \
-d 'username=admin&password=supersecret1'
"https://localhost:8443/nifi-api/access/token")
curl -k \
-H "Authorization: Bearer $access_token" \
"https://localhost:8443/nifi-api/processors/dd24aaec-0182-1000-ffff-ffff9f128d94/state"
{
"componentState": {
"componentId": "dd24aaec-0182-1000-ffff-ffff9f128d94",
"stateDescription": "After performing a listing of files, the timestamp of the newest file is stored. This allows the Processor to list only files that have been added or modified after this date the next time that the Processor is run. Whether the state is stored with a Local or Cluster scope depends on the value of the <Input Directory Location> property.",
"clusterState": {
"scope": "CLUSTER",
"totalEntryCount": 0,
"state": []
},
"localState": {
"scope": "LOCAL",
"totalEntryCount": 6,
"state": [
{
"key": "id.0",
"value": "/tmp/hsperfdata_nifi/129",
"clusterNodeId": "420e540a-ccd6-4e7c-bc25-c572f503b338",
"clusterNodeAddress": "nifi1:8443"
},
{
"key": "id.0",
"value": "/tmp/hsperfdata_nifi/130",
"clusterNodeId": "00e23669-130f-4e12-8a26-be3ab95923d4",
"clusterNodeAddress": "nifi0:8443"
},
{
"key": "listing.timestamp",
"value": "1661567544049",
"clusterNodeId": "420e540a-ccd6-4e7c-bc25-c572f503b338",
"clusterNodeAddress": "nifi1:8443"
},
{
"key": "listing.timestamp",
"value": "1661567541525",
"clusterNodeId": "00e23669-130f-4e12-8a26-be3ab95923d4",
"clusterNodeAddress": "nifi0:8443"
},
{
"key": "processed.timestamp",
"value": "1661567544049",
"clusterNodeId": "420e540a-ccd6-4e7c-bc25-c572f503b338",
"clusterNodeAddress": "nifi1:8443"
},
{
"key": "processed.timestamp",
"value": "1661567541525",
"clusterNodeId": "00e23669-130f-4e12-8a26-be3ab95923d4",
"clusterNodeAddress": "nifi0:8443"
}
]
}
}
}
Cheers,
André
Was your question answered? Please take some time to click on "Accept as Solution" below this post.
If you find a reply useful, say thanks by clicking on the thumbs up button.
Created 08-26-2022 09:00 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Hi ,
Is not every record you pull through the GenerateTableFetch will have the max value for the max column?
Created 08-26-2022 07:42 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
@nk20 ,
You can use the NiFi API to extract the current state of any stateful processor if you need this.
See the example below for a PutFile processor:
access_token=$(curl -k \
-X POST \
-H 'Content-Type: application/x-www-form-urlencoded' \
-d 'username=admin&password=supersecret1'
"https://localhost:8443/nifi-api/access/token")
curl -k \
-H "Authorization: Bearer $access_token" \
"https://localhost:8443/nifi-api/processors/dd24aaec-0182-1000-ffff-ffff9f128d94/state"
{
"componentState": {
"componentId": "dd24aaec-0182-1000-ffff-ffff9f128d94",
"stateDescription": "After performing a listing of files, the timestamp of the newest file is stored. This allows the Processor to list only files that have been added or modified after this date the next time that the Processor is run. Whether the state is stored with a Local or Cluster scope depends on the value of the <Input Directory Location> property.",
"clusterState": {
"scope": "CLUSTER",
"totalEntryCount": 0,
"state": []
},
"localState": {
"scope": "LOCAL",
"totalEntryCount": 6,
"state": [
{
"key": "id.0",
"value": "/tmp/hsperfdata_nifi/129",
"clusterNodeId": "420e540a-ccd6-4e7c-bc25-c572f503b338",
"clusterNodeAddress": "nifi1:8443"
},
{
"key": "id.0",
"value": "/tmp/hsperfdata_nifi/130",
"clusterNodeId": "00e23669-130f-4e12-8a26-be3ab95923d4",
"clusterNodeAddress": "nifi0:8443"
},
{
"key": "listing.timestamp",
"value": "1661567544049",
"clusterNodeId": "420e540a-ccd6-4e7c-bc25-c572f503b338",
"clusterNodeAddress": "nifi1:8443"
},
{
"key": "listing.timestamp",
"value": "1661567541525",
"clusterNodeId": "00e23669-130f-4e12-8a26-be3ab95923d4",
"clusterNodeAddress": "nifi0:8443"
},
{
"key": "processed.timestamp",
"value": "1661567544049",
"clusterNodeId": "420e540a-ccd6-4e7c-bc25-c572f503b338",
"clusterNodeAddress": "nifi1:8443"
},
{
"key": "processed.timestamp",
"value": "1661567541525",
"clusterNodeId": "00e23669-130f-4e12-8a26-be3ab95923d4",
"clusterNodeAddress": "nifi0:8443"
}
]
}
}
}
Cheers,
André
Was your question answered? Please take some time to click on "Accept as Solution" below this post.
If you find a reply useful, say thanks by clicking on the thumbs up button.
Created 09-04-2022 04:19 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
@araujo Thank you for the help.. It works.
