Community Articles

Find and share helpful community-sourced technical articles.
Labels (1)
avatar
Master Guru

Previous article demonstrated how to use nifi rest api to create a template. The objective for this article is to demonstrate how to use NiFi rest api to change a work flow on a secured cluster real time without service disruption or downtime.

13311-1.jpg

Use Case

A process group named CICD is a active data flow (data is flowing through it) and a new processor will be added at the end of a flow without application disruption or downtime. Comments in the picture above.

Steps

  1. Create Access Token
  2. Create Client ID
  3. Fetch Process Group attributes (process group which the changes will be applied to)
  4. Create LogAttribute Processor
  5. Fetch the UUID of the UpdateAttribute processor
    1. Why? Data will flow from this existing processor into newly added LogAttribute Processor
  6. Link LogAttribute & UpdateAttribute via success relationship
  7. Update LogAttribute to terminate on success relationship

Steps 1 & 2 & 3

These steps are described here so no reason to copy and paste.

Step 4 - Create LogAttribute Processor

payload

{
  "revision": {
    "clientId": "556f3dcf-c8d4-1145-0d1b-539ef89a01da",
    "version": 0
  },
  "component": {
    "type": "org.apache.nifi.processors.standard.LogAttribute",
    "name": "LogAttribute",
    "position": {
      "x": 1406.2231010674523,
      "y": 1243.8565539964334
    }
  }
}

API

/nifi-api/process-groups/YourProcessGroupUUID/processors

My process group UUID is

a522b679-015a-1000-0000-00003f33fd93

Post

curl -X POST -H "Accept: application/json, text/javascript, */*; q=0.01" -H "Accept-Encoding: gzip" -H "Accept-Language: en-US,en;q=0.5" -H "Authorization: Bearer eyJhbGciOiJIUzI1NiJ9" -H "Connection: keep-alive" -H "Content-Length: 229" -H "Content-Type: application/json" -H "Host: nifi.com:9091" -H "Referer: https://nifi.com:9091/nifi/login" -H "X-Requested-With: XMLHttpRequest" -d '{"revision":{"clientId":"556f3dcf-c8d4-1145-0d1b-539ef89a01da","version":0},"component":{"type":"org.apache.nifi.processors.standard.LogAttribute","name":"LogAttribute","position":{"x":1406.2231010674523,"y":1243.8565539964334}}}' https://nifi.com:9091/nifi-api/process-groups/a522b679-015a-1000-0000-00003f33fd93/processors| gunzip

Returns large json payload which include UUID of the newly created processor (LogAttribute)

The new added processor will be displayed on the UI

13312-2.jpg

Step 5 - Fetch the UUID of the UpdateAttribute processor

Fetch the UUID of the UpdateAttribute processor. In the next step a link (via success relationship) between the newly created LogAttribute processor to UpdateAttribute will be established through their respective UUID

API

/nifi-api/flow/process-groups/YourProcessGroupUUID

Post

curl -X GET -H "Accept: */*" -H "Accept-Encoding: gzip" -H "Accept-Language: en-US,en;q=0.5" -H "Authorization: Bearer eyJhbGciOiJIUzI1NiJ9" -H "Connection: keep-alive" -H "Host: nifi.com:9091" -H "Referer: https://nifi.com:9091/nifi/login" -H "X-Requested-With: XMLHttpRequest" https://nifi.com:9091/nifi-api/flow/process-groups/a522b679-015a-1000-0000-00003f33fd93 | gunzip

This will return large json message

  "processGroupFlow": {
    "id": "a522b679-015a-1000-0000-00003f33fd93",
    ..........
      }
    },
    "flow": {
....
          "component": {
            "id": "08873906-c9ca-1190-8376-b863cef4e797",
....
            "name": "UpdateAttribute",
            "type": "org.apache.nifi.processors.attributes.UpdateAttribute",
            "state": "RUNNING",
            ...

The UUID for UpdateAttribute processor is (Yours will be different)

08873906-c9ca-1190-8376-b863cef4e797

Step 6 - Link LogAttribute & UpdateAttribute via success relationship

Payload

{
  "revision": {
    "clientId": "556f3dca-c8d4-1145-2ac4-3e59aa43f6ec",
    "version": 0
  },
  "component": {
    "name": "",
    "source": {
      "id": "08873906-c9ca-1190-8376-b863cef4e797",
      "groupId": "a522b679-015a-1000-0000-00003f33fd93",
      "type": "PROCESSOR"
    },
    "destination": {
      "id": "556f687a-c8d4-1145-0000-000032d88e41",
      "groupId": "a522b679-015a-1000-0000-00003f33fd93",
      "type": "PROCESSOR"
    },
    "selectedRelationships": [
      "success"
    ],
    "flowFileExpiration": "0 sec",
    "backPressureDataSizeThreshold": "1 GB",
    "backPressureObjectThreshold": "10000",
    "bends": [],
    "prioritizers": []
  }
}

Source is the existing processor (UpdateAttribute) we want the flow files from and the destination is the newly added processor (LogAttribute)

API

/nifi-api/process-groups/ProcessGroupUUID/connections

Post

curl -X POST -H "Accept: application/json, text/javascript, */*; q=0.01" -H "Accept-Encoding: gzip" -H "Accept-Language: en-US,en;q=0.5" -H "Authorization: Bearer eyJhbGciOiJIUzI1NiJ9=" -H "Connection: keep-alive" -H "Content-Length: 522" -H "Content-Type: application/json" -H "Host: hnifi.com:9091" 





-H "Referer: https://nifi.com:9091/nifi/login" -H "X-Requested-With: XMLHttpRequest" -d '{"revision":{"clientId":"556f3dcf-c8d4-1145-0d1b-539ef89a01da","version":0},"component":{"name":"","source":{"id":"08873906-c9ca-1190-8376-b863cef4e797","groupId":"a522b679-015a-1000-0000-00003f33fd93","type":"PROCESSOR"},"destination":{"id":"556f687a-c8d4-1145-0000-000032d88e41","groupId":"a522b679-015a-1000-0000-00003f33fd93","type":"PROCESSOR"},"selectedRelationships":["success"],"flowFileExpiration":"0 sec","backPressureDataSizeThreshold":"1 GB","backPressureObjectThreshold":"10000","bends":[],"prioritizers":[]}}' https://nifi.com:9091/nifi-api/process-groups/a522b679-015a-1000-0000-00003f33fd93/connections | gunzip

Through the UI the newly added processor (LogAttribute) is linked to existing UpdateAttribute processor.

13313-3.jpg

Notice the LogAttribute has a yellow caution democratization on the top left. The processor requires action on the success relationship. For this example success relationship will be set to terminate..meaning no other work beyond LogAttribute is required.

Step 7 - Update LogAttribute to terminate on success relationship

Set auto terminate on success for the newly added processor

API

/nifi-api/processors/ProcessorUI

PUT

curl -X PUT -H "Accept: application/json, text/javascript, */*; q=0.01" -H "Accept-Encoding: gzip" -H "Accept-Language: en-US,en;q=0.5" -H "Authorization: Bearer eyJhbGciOiJIUzI1NiJ9" -H "Connection: keep-alive" -H "Content-Length: 522" -H "Content-Type: application/json" -H "Host: nifi.com:9091" -H "Referer: https://nifi.com:9091/nifi/login" -H "X-Requested-With: XMLHttpRequest" -d '{"component":{"id":"556f687a-c8d4-1145-0000-000032d88e41","name":"LogAttribute","config":{"concurrentlySchedulableTaskCount":"1","schedulingPeriod":"0 sec","executionNode":"ALL","penaltyDuration":"30 sec","yieldDuration":"1 sec","bulletinLevel":"WARN","schedulingStrategy":"TIMER_DRIVEN","comments":"","autoTerminatedRelationships":["success"]},"state":"STOPPED"},"revision":{"clientId":"556f3dcf-c8d4-1145-0d1b-539ef89a01da","version":5}}' https://nifi.com:9091/nifi-api/processors/556f687a-c8d4-1145-0000-000032d88e41 | gunzip

The UI will show logattribute is in "stopped" status and the "Automatically Terminate Relationships" success is checked off.

13316-4.jpg

Step 8 - Start the processor

Payload

{
  "revision": {
    "clientId": "a556f3dce-c8d4-1145-ff5c-6c6f302bd804",
    "version": 5
  },
  "component": {
    "id": "556f5bc4-c8d4-1145-0000-000059ff8789",
    "state": "RUNNING"
  }
}

id = processor UUID

API

/nifi-api/processors/ProcessorUI

PUT

curl -X PUT -H "Accept: application/json, text/javascript, */*; q=0.01" -H "Accept-Encoding: gzip" -H "Accept-Language: en-US,en;q=0.5" -H "Authorization: Bearer eyJhbGciOiJIUzI1NiJ9" -H "Connection: keep-alive" -H "Content-Length: 522" -H "Content-Type: application/json" -H "Host: nifi.com:9091" -H "Referer: https://nifi.com:9091/nifi/login" -H "X-Requested-With: XMLHttpRequest" -d '{"revision":{"clientId":"556f3dcf-c8d4-1145-0d1b-539ef89a01da","version":5},"component":{"id":"556f687a-c8d4-1145-0000-000032d88e41","state":"RUNNING"}}' https://nifi.com:9091/nifi-api/processors/556f687a-c8d4-1145-0000-000032d88e41 | gunzip

Picture belows shows the flow has been updated via rest api real time without any disruption or stoppage of workflow.

Note - All of the actions described in this article may be performed via UI.

13318-5.jpg

19,493 Views
Comments
avatar
Expert Contributor

Thanks @smanjee

Good job! I am looking forward Part II