Support Questions

Find answers, ask questions, and share your expertise

Extracting state from GenerateTableFetch processor

avatar
Explorer

Hi, 

 

We are migrating data from one db to another using NIFI. We are using GenerateTableFetch processor for the incremental fetch by setting a column in Max-Value columns. For logging purpose, i need to extract the state of the GenerateTableFetch  processor and insert in a table. By state I mean the max value of the 'Max-Value column' that is captured as the state of the processor. Can someone please help with how can i extract the processor state?

 

Thank you. 

 

1 ACCEPTED SOLUTION

avatar
Super Guru

@nk20 ,

 

You can use the NiFi API to extract the current state of any stateful processor if you need this.

See the example below for a PutFile processor:

access_token=$(curl -k \
  -X POST \
  -H 'Content-Type: application/x-www-form-urlencoded' \
  -d 'username=admin&password=supersecret1'
  "https://localhost:8443/nifi-api/access/token")

curl -k \
  -H "Authorization: Bearer $access_token" \
  "https://localhost:8443/nifi-api/processors/dd24aaec-0182-1000-ffff-ffff9f128d94/state"

{
  "componentState": {
    "componentId": "dd24aaec-0182-1000-ffff-ffff9f128d94",
    "stateDescription": "After performing a listing of files, the timestamp of the newest file is stored. This allows the Processor to list only files that have been added or modified after this date the next time that the Processor is run. Whether the state is stored with a Local or Cluster scope depends on the value of the <Input Directory Location> property.",
    "clusterState": {
      "scope": "CLUSTER",
      "totalEntryCount": 0,
      "state": []
    },
    "localState": {
      "scope": "LOCAL",
      "totalEntryCount": 6,
      "state": [
        {
          "key": "id.0",
          "value": "/tmp/hsperfdata_nifi/129",
          "clusterNodeId": "420e540a-ccd6-4e7c-bc25-c572f503b338",
          "clusterNodeAddress": "nifi1:8443"
        },
        {
          "key": "id.0",
          "value": "/tmp/hsperfdata_nifi/130",
          "clusterNodeId": "00e23669-130f-4e12-8a26-be3ab95923d4",
          "clusterNodeAddress": "nifi0:8443"
        },
        {
          "key": "listing.timestamp",
          "value": "1661567544049",
          "clusterNodeId": "420e540a-ccd6-4e7c-bc25-c572f503b338",
          "clusterNodeAddress": "nifi1:8443"
        },
        {
          "key": "listing.timestamp",
          "value": "1661567541525",
          "clusterNodeId": "00e23669-130f-4e12-8a26-be3ab95923d4",
          "clusterNodeAddress": "nifi0:8443"
        },
        {
          "key": "processed.timestamp",
          "value": "1661567544049",
          "clusterNodeId": "420e540a-ccd6-4e7c-bc25-c572f503b338",
          "clusterNodeAddress": "nifi1:8443"
        },
        {
          "key": "processed.timestamp",
          "value": "1661567541525",
          "clusterNodeId": "00e23669-130f-4e12-8a26-be3ab95923d4",
          "clusterNodeAddress": "nifi0:8443"
        }
      ]
    }
  }
}

 

Cheers,

André

--
Was your question answered? Please take some time to click on "Accept as Solution" below this post.
If you find a reply useful, say thanks by clicking on the thumbs up button.

View solution in original post

3 REPLIES 3

avatar
Super Guru

Hi ,

 

Is not every record you pull through the GenerateTableFetch will have the max value for the max column?

avatar
Super Guru

@nk20 ,

 

You can use the NiFi API to extract the current state of any stateful processor if you need this.

See the example below for a PutFile processor:

access_token=$(curl -k \
  -X POST \
  -H 'Content-Type: application/x-www-form-urlencoded' \
  -d 'username=admin&password=supersecret1'
  "https://localhost:8443/nifi-api/access/token")

curl -k \
  -H "Authorization: Bearer $access_token" \
  "https://localhost:8443/nifi-api/processors/dd24aaec-0182-1000-ffff-ffff9f128d94/state"

{
  "componentState": {
    "componentId": "dd24aaec-0182-1000-ffff-ffff9f128d94",
    "stateDescription": "After performing a listing of files, the timestamp of the newest file is stored. This allows the Processor to list only files that have been added or modified after this date the next time that the Processor is run. Whether the state is stored with a Local or Cluster scope depends on the value of the <Input Directory Location> property.",
    "clusterState": {
      "scope": "CLUSTER",
      "totalEntryCount": 0,
      "state": []
    },
    "localState": {
      "scope": "LOCAL",
      "totalEntryCount": 6,
      "state": [
        {
          "key": "id.0",
          "value": "/tmp/hsperfdata_nifi/129",
          "clusterNodeId": "420e540a-ccd6-4e7c-bc25-c572f503b338",
          "clusterNodeAddress": "nifi1:8443"
        },
        {
          "key": "id.0",
          "value": "/tmp/hsperfdata_nifi/130",
          "clusterNodeId": "00e23669-130f-4e12-8a26-be3ab95923d4",
          "clusterNodeAddress": "nifi0:8443"
        },
        {
          "key": "listing.timestamp",
          "value": "1661567544049",
          "clusterNodeId": "420e540a-ccd6-4e7c-bc25-c572f503b338",
          "clusterNodeAddress": "nifi1:8443"
        },
        {
          "key": "listing.timestamp",
          "value": "1661567541525",
          "clusterNodeId": "00e23669-130f-4e12-8a26-be3ab95923d4",
          "clusterNodeAddress": "nifi0:8443"
        },
        {
          "key": "processed.timestamp",
          "value": "1661567544049",
          "clusterNodeId": "420e540a-ccd6-4e7c-bc25-c572f503b338",
          "clusterNodeAddress": "nifi1:8443"
        },
        {
          "key": "processed.timestamp",
          "value": "1661567541525",
          "clusterNodeId": "00e23669-130f-4e12-8a26-be3ab95923d4",
          "clusterNodeAddress": "nifi0:8443"
        }
      ]
    }
  }
}

 

Cheers,

André

--
Was your question answered? Please take some time to click on "Accept as Solution" below this post.
If you find a reply useful, say thanks by clicking on the thumbs up button.

avatar
Explorer

@araujo Thank you for the help.. It works.