Created on 03-06-2017 09:50 PM - edited 08-17-2019 01:57 PM
Previous article demonstrated how to use nifi rest api to create a template. The objective for this article is to demonstrate how to use NiFi rest api to change a work flow on a secured cluster real time without service disruption or downtime.
Use Case
A process group named CICD is a active data flow (data is flowing through it) and a new processor will be added at the end of a flow without application disruption or downtime. Comments in the picture above.
Steps
Steps 1 & 2 & 3
These steps are described here so no reason to copy and paste.
Step 4 - Create LogAttribute Processor
payload
{ "revision": { "clientId": "556f3dcf-c8d4-1145-0d1b-539ef89a01da", "version": 0 }, "component": { "type": "org.apache.nifi.processors.standard.LogAttribute", "name": "LogAttribute", "position": { "x": 1406.2231010674523, "y": 1243.8565539964334 } } }
API
/nifi-api/process-groups/YourProcessGroupUUID/processors
My process group UUID is
a522b679-015a-1000-0000-00003f33fd93
Post
curl -X POST -H "Accept: application/json, text/javascript, */*; q=0.01" -H "Accept-Encoding: gzip" -H "Accept-Language: en-US,en;q=0.5" -H "Authorization: Bearer eyJhbGciOiJIUzI1NiJ9" -H "Connection: keep-alive" -H "Content-Length: 229" -H "Content-Type: application/json" -H "Host: nifi.com:9091" -H "Referer: https://nifi.com:9091/nifi/login" -H "X-Requested-With: XMLHttpRequest" -d '{"revision":{"clientId":"556f3dcf-c8d4-1145-0d1b-539ef89a01da","version":0},"component":{"type":"org.apache.nifi.processors.standard.LogAttribute","name":"LogAttribute","position":{"x":1406.2231010674523,"y":1243.8565539964334}}}' https://nifi.com:9091/nifi-api/process-groups/a522b679-015a-1000-0000-00003f33fd93/processors| gunzip
Returns large json payload which include UUID of the newly created processor (LogAttribute)
The new added processor will be displayed on the UI
Step 5 - Fetch the UUID of the UpdateAttribute processor
Fetch the UUID of the UpdateAttribute processor. In the next step a link (via success relationship) between the newly created LogAttribute processor to UpdateAttribute will be established through their respective UUID
API
/nifi-api/flow/process-groups/YourProcessGroupUUID
Post
curl -X GET -H "Accept: */*" -H "Accept-Encoding: gzip" -H "Accept-Language: en-US,en;q=0.5" -H "Authorization: Bearer eyJhbGciOiJIUzI1NiJ9" -H "Connection: keep-alive" -H "Host: nifi.com:9091" -H "Referer: https://nifi.com:9091/nifi/login" -H "X-Requested-With: XMLHttpRequest" https://nifi.com:9091/nifi-api/flow/process-groups/a522b679-015a-1000-0000-00003f33fd93 | gunzip
This will return large json message
"processGroupFlow": { "id": "a522b679-015a-1000-0000-00003f33fd93", .......... } }, "flow": { .... "component": { "id": "08873906-c9ca-1190-8376-b863cef4e797", .... "name": "UpdateAttribute", "type": "org.apache.nifi.processors.attributes.UpdateAttribute", "state": "RUNNING", ...
The UUID for UpdateAttribute processor is (Yours will be different)
08873906-c9ca-1190-8376-b863cef4e797
Step 6 - Link LogAttribute & UpdateAttribute via success relationship
Payload
{ "revision": { "clientId": "556f3dca-c8d4-1145-2ac4-3e59aa43f6ec", "version": 0 }, "component": { "name": "", "source": { "id": "08873906-c9ca-1190-8376-b863cef4e797", "groupId": "a522b679-015a-1000-0000-00003f33fd93", "type": "PROCESSOR" }, "destination": { "id": "556f687a-c8d4-1145-0000-000032d88e41", "groupId": "a522b679-015a-1000-0000-00003f33fd93", "type": "PROCESSOR" }, "selectedRelationships": [ "success" ], "flowFileExpiration": "0 sec", "backPressureDataSizeThreshold": "1 GB", "backPressureObjectThreshold": "10000", "bends": [], "prioritizers": [] } }
Source is the existing processor (UpdateAttribute) we want the flow files from and the destination is the newly added processor (LogAttribute)
API
/nifi-api/process-groups/ProcessGroupUUID/connections
Post
curl -X POST -H "Accept: application/json, text/javascript, */*; q=0.01" -H "Accept-Encoding: gzip" -H "Accept-Language: en-US,en;q=0.5" -H "Authorization: Bearer eyJhbGciOiJIUzI1NiJ9=" -H "Connection: keep-alive" -H "Content-Length: 522" -H "Content-Type: application/json" -H "Host: hnifi.com:9091" -H "Referer: https://nifi.com:9091/nifi/login" -H "X-Requested-With: XMLHttpRequest" -d '{"revision":{"clientId":"556f3dcf-c8d4-1145-0d1b-539ef89a01da","version":0},"component":{"name":"","source":{"id":"08873906-c9ca-1190-8376-b863cef4e797","groupId":"a522b679-015a-1000-0000-00003f33fd93","type":"PROCESSOR"},"destination":{"id":"556f687a-c8d4-1145-0000-000032d88e41","groupId":"a522b679-015a-1000-0000-00003f33fd93","type":"PROCESSOR"},"selectedRelationships":["success"],"flowFileExpiration":"0 sec","backPressureDataSizeThreshold":"1 GB","backPressureObjectThreshold":"10000","bends":[],"prioritizers":[]}}' https://nifi.com:9091/nifi-api/process-groups/a522b679-015a-1000-0000-00003f33fd93/connections | gunzip
Through the UI the newly added processor (LogAttribute) is linked to existing UpdateAttribute processor.
Notice the LogAttribute has a yellow caution democratization on the top left. The processor requires action on the success relationship. For this example success relationship will be set to terminate..meaning no other work beyond LogAttribute is required.
Step 7 - Update LogAttribute to terminate on success relationship
Set auto terminate on success for the newly added processor
API
/nifi-api/processors/ProcessorUI
PUT
curl -X PUT -H "Accept: application/json, text/javascript, */*; q=0.01" -H "Accept-Encoding: gzip" -H "Accept-Language: en-US,en;q=0.5" -H "Authorization: Bearer eyJhbGciOiJIUzI1NiJ9" -H "Connection: keep-alive" -H "Content-Length: 522" -H "Content-Type: application/json" -H "Host: nifi.com:9091" -H "Referer: https://nifi.com:9091/nifi/login" -H "X-Requested-With: XMLHttpRequest" -d '{"component":{"id":"556f687a-c8d4-1145-0000-000032d88e41","name":"LogAttribute","config":{"concurrentlySchedulableTaskCount":"1","schedulingPeriod":"0 sec","executionNode":"ALL","penaltyDuration":"30 sec","yieldDuration":"1 sec","bulletinLevel":"WARN","schedulingStrategy":"TIMER_DRIVEN","comments":"","autoTerminatedRelationships":["success"]},"state":"STOPPED"},"revision":{"clientId":"556f3dcf-c8d4-1145-0d1b-539ef89a01da","version":5}}' https://nifi.com:9091/nifi-api/processors/556f687a-c8d4-1145-0000-000032d88e41 | gunzip
The UI will show logattribute is in "stopped" status and the "Automatically Terminate Relationships" success is checked off.
Step 8 - Start the processor
Payload
{ "revision": { "clientId": "a556f3dce-c8d4-1145-ff5c-6c6f302bd804", "version": 5 }, "component": { "id": "556f5bc4-c8d4-1145-0000-000059ff8789", "state": "RUNNING" } }
id = processor UUID
API
/nifi-api/processors/ProcessorUI
PUT
curl -X PUT -H "Accept: application/json, text/javascript, */*; q=0.01" -H "Accept-Encoding: gzip" -H "Accept-Language: en-US,en;q=0.5" -H "Authorization: Bearer eyJhbGciOiJIUzI1NiJ9" -H "Connection: keep-alive" -H "Content-Length: 522" -H "Content-Type: application/json" -H "Host: nifi.com:9091" -H "Referer: https://nifi.com:9091/nifi/login" -H "X-Requested-With: XMLHttpRequest" -d '{"revision":{"clientId":"556f3dcf-c8d4-1145-0d1b-539ef89a01da","version":5},"component":{"id":"556f687a-c8d4-1145-0000-000032d88e41","state":"RUNNING"}}' https://nifi.com:9091/nifi-api/processors/556f687a-c8d4-1145-0000-000032d88e41 | gunzip
Picture belows shows the flow has been updated via rest api real time without any disruption or stoppage of workflow.
Note - All of the actions described in this article may be performed via UI.
Created on 08-09-2017 07:21 PM
Thanks @smanjee
Good job! I am looking forward Part II