Created 08-25-2022 06:31 PM
Hi,
We are migrating data from one db to another using NIFI. We are using GenerateTableFetch processor for the incremental fetch by setting a column in Max-Value columns. For logging purpose, i need to extract the state of the GenerateTableFetch processor and insert in a table. By state I mean the max value of the 'Max-Value column' that is captured as the state of the processor. Can someone please help with how can i extract the processor state?
Thank you.
Created 08-26-2022 07:42 PM
@nk20 ,
You can use the NiFi API to extract the current state of any stateful processor if you need this.
See the example below for a PutFile processor:
access_token=$(curl -k \
-X POST \
-H 'Content-Type: application/x-www-form-urlencoded' \
-d 'username=admin&password=supersecret1'
"https://localhost:8443/nifi-api/access/token")
curl -k \
-H "Authorization: Bearer $access_token" \
"https://localhost:8443/nifi-api/processors/dd24aaec-0182-1000-ffff-ffff9f128d94/state"
{
"componentState": {
"componentId": "dd24aaec-0182-1000-ffff-ffff9f128d94",
"stateDescription": "After performing a listing of files, the timestamp of the newest file is stored. This allows the Processor to list only files that have been added or modified after this date the next time that the Processor is run. Whether the state is stored with a Local or Cluster scope depends on the value of the <Input Directory Location> property.",
"clusterState": {
"scope": "CLUSTER",
"totalEntryCount": 0,
"state": []
},
"localState": {
"scope": "LOCAL",
"totalEntryCount": 6,
"state": [
{
"key": "id.0",
"value": "/tmp/hsperfdata_nifi/129",
"clusterNodeId": "420e540a-ccd6-4e7c-bc25-c572f503b338",
"clusterNodeAddress": "nifi1:8443"
},
{
"key": "id.0",
"value": "/tmp/hsperfdata_nifi/130",
"clusterNodeId": "00e23669-130f-4e12-8a26-be3ab95923d4",
"clusterNodeAddress": "nifi0:8443"
},
{
"key": "listing.timestamp",
"value": "1661567544049",
"clusterNodeId": "420e540a-ccd6-4e7c-bc25-c572f503b338",
"clusterNodeAddress": "nifi1:8443"
},
{
"key": "listing.timestamp",
"value": "1661567541525",
"clusterNodeId": "00e23669-130f-4e12-8a26-be3ab95923d4",
"clusterNodeAddress": "nifi0:8443"
},
{
"key": "processed.timestamp",
"value": "1661567544049",
"clusterNodeId": "420e540a-ccd6-4e7c-bc25-c572f503b338",
"clusterNodeAddress": "nifi1:8443"
},
{
"key": "processed.timestamp",
"value": "1661567541525",
"clusterNodeId": "00e23669-130f-4e12-8a26-be3ab95923d4",
"clusterNodeAddress": "nifi0:8443"
}
]
}
}
}
Cheers,
André
Created 08-26-2022 09:00 AM
Hi ,
Is not every record you pull through the GenerateTableFetch will have the max value for the max column?
Created 08-26-2022 07:42 PM
@nk20 ,
You can use the NiFi API to extract the current state of any stateful processor if you need this.
See the example below for a PutFile processor:
access_token=$(curl -k \
-X POST \
-H 'Content-Type: application/x-www-form-urlencoded' \
-d 'username=admin&password=supersecret1'
"https://localhost:8443/nifi-api/access/token")
curl -k \
-H "Authorization: Bearer $access_token" \
"https://localhost:8443/nifi-api/processors/dd24aaec-0182-1000-ffff-ffff9f128d94/state"
{
"componentState": {
"componentId": "dd24aaec-0182-1000-ffff-ffff9f128d94",
"stateDescription": "After performing a listing of files, the timestamp of the newest file is stored. This allows the Processor to list only files that have been added or modified after this date the next time that the Processor is run. Whether the state is stored with a Local or Cluster scope depends on the value of the <Input Directory Location> property.",
"clusterState": {
"scope": "CLUSTER",
"totalEntryCount": 0,
"state": []
},
"localState": {
"scope": "LOCAL",
"totalEntryCount": 6,
"state": [
{
"key": "id.0",
"value": "/tmp/hsperfdata_nifi/129",
"clusterNodeId": "420e540a-ccd6-4e7c-bc25-c572f503b338",
"clusterNodeAddress": "nifi1:8443"
},
{
"key": "id.0",
"value": "/tmp/hsperfdata_nifi/130",
"clusterNodeId": "00e23669-130f-4e12-8a26-be3ab95923d4",
"clusterNodeAddress": "nifi0:8443"
},
{
"key": "listing.timestamp",
"value": "1661567544049",
"clusterNodeId": "420e540a-ccd6-4e7c-bc25-c572f503b338",
"clusterNodeAddress": "nifi1:8443"
},
{
"key": "listing.timestamp",
"value": "1661567541525",
"clusterNodeId": "00e23669-130f-4e12-8a26-be3ab95923d4",
"clusterNodeAddress": "nifi0:8443"
},
{
"key": "processed.timestamp",
"value": "1661567544049",
"clusterNodeId": "420e540a-ccd6-4e7c-bc25-c572f503b338",
"clusterNodeAddress": "nifi1:8443"
},
{
"key": "processed.timestamp",
"value": "1661567541525",
"clusterNodeId": "00e23669-130f-4e12-8a26-be3ab95923d4",
"clusterNodeAddress": "nifi0:8443"
}
]
}
}
}
Cheers,
André
Created 09-04-2022 04:19 PM
@araujo Thank you for the help.. It works.