Community Articles
Find and share helpful community-sourced technical articles
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Today we will show how to interact with a NiFi instance to modify a flow at runtime via API.

Pre-requisites

  1. NiFi installed and running on a localhost: https://nifi.apache.org/download.html
  2. Groovy - because its JSON builders and REST DSLs are great. If you are on a Mac, the easiest is to run brew install groovy. To install Homebrew (the superb Mac package manager), visit http://brew.sh/
  3. Full script is available on GitHub: https://github.com/aperepel/nifi-rest-api-tutorial
  4. NiFi REST API Docs: https://nifi.apache.org/docs/nifi-docs/rest-api/index.html

Here’s the test flow we will be working with today:

429-screenshot.png

Prepare the test flow:

  • Add a PutFile component to the canvas.
  • Rename the processor to Save File (right-click -> Configure -> Settings -> Name field). We will be using this name to look up the processor later via API.
  • Add a GetHTTP processor and create a connection from GetHTTP to ‘Save File’. GetHTTP settings can be ignored for now, the Save File processor simply needs an input connected.
  • Set the Save File properties as below (these settings will be modified programmatically next)
  • Start the Save File processor (no need to start GetHTTP for our purposes)

430-screenshot.png

Note: for a more complex flow, one would use templates: https://nifi.apache.org/docs/nifi-docs/html/user-guide.html#templates

Working with the API

Next, we will update the Save File processor to use a different directory (/tmp/staging) and set Create Missing Directories to true.

High-level script flow:

  1. Search the data flow for a component to operate on. Lookup term is 'Save File'. This is the same API used by the Search field in the UI.

    441-screen-shot-2015-11-10-at-22917-pm.png

  2. Validate there’s only 1 processor returned - we want to make sure we’re modifying the expected one.
  3. Sync up with the framework state - get the latest version field value, will be used in the update statement next. This is a classic Optimistic Locking pattern implementation.
  4. Build a small JSON document containing only state changes.
  5. Perform a partial update via a PUT operation.
  6. Repeat steps 4-5 to stop, update configuration (change directory and missing dirs properties) and start the processor.

For the impatient among us execute the script directly (clone/checkout the github if you want to play with the code later):

groovy https://raw.githubusercontent.com/aperepel/nifi-rest-api-tutorial/master/reconfigure.groovy

You will see an output similar to this:

Looking up a component to update...
Found the component, id/group:  c35f1bb7-5add-427f-864a-bdd23bb4ac7f/f1a2c4e8-b106-4877-97d9-9dbca868fc16
Preparing to update the flow state...
Stopping the processor to apply changes...
Updating processor...
{
    "revision": {
        "clientId": "my awesome script",
        "version": 309
    },
    "processor": {
        "id": "c35f1bb7-5add-427f-864a-bdd23bb4ac7f",
        "config": {
            "properties": {
                "Directory": "/tmp/staging",
                "Create Missing Directories": "true"
            }
        }
    }
}
Updated ok.
Bringing the updated processor back online...
Ok

If you check the NiFi processor again, you will see the updated Directory and Create Missing Dirs. Additionally, every step has been captured and recorded in the flow history:

444-screenshot.png

When you see a warning message in the UI, simply hit the Refresh link right next to it - I will explain the concurrency controls at the end of this article.

Code Walkthrough

First, we will pull in a dependency https://github.com/jgritman/httpbuilder/wiki/RESTClient . It is available in a public maven repository and is fetched automatically.

@Grab(group='org.codehaus.groovy.modules.http-builder',
        module='http-builder',
        version='0.7.1')

This allows us to use nice REST DSL like these:

nifi.get(
    path: 'controller/search-results',
    query: [q: processorName]
)

nifi.put(
    path: "controller/process-groups/$processGroup/processors/$processorId",
    body: builder.toPrettyString(),
    requestContentType: JSON
)

Next, we are using Groovy's JSON builder to construct a JSON document for a partial PUT update, i.e. only specify the properties you want to change in the update, like this:

builder {
    revision {
        clientId 'my awesome script'
        version resp.data.revision.version
    }
    processor {
        id "$processorId"
        config {
            properties {
                'Directory' '/tmp/staging'
                'Create Missing Directories' 'true'
            }
        }
    }
}

Those dot-notation variables navigate the JSON document tree from a previous response. To understand how to structure it, start by issuing a GET request against your processor, which will fetch a complete state document.

Tip: UI does everything through the REST API, it’s a great learning interactive learning tool in itself. One note, though, the UI will interchangeably leverage both PUT and POST (form) requests, so choose whichever is more convenient. In this write-up we will be using PUT with JSON.

Finally, the clientId and version business is explained in the next section.

Optimistic Locking in NiFi

The diagram below describes the concept.

Supplying a clientId is required for update operations to avoid running into consistency issues (the API will respond with 409 Conflict status code and it will be really confusing if a developer doesn’t know about this attribute).

controller/revision returns the clientId of a user who last modified the flow among other things. This is NOT always your id, best practice is to supply your own unique value to identify the client. It’s actually a free-form value, UUID is just a default that the framework generates for you if missing.

442-nifi-optimistic-locking-2-users.png

16,728 Views
Comments

Great article. Thanks for sharing :)

Explorer

hello Andrew

thx for this nice article and for the script!

but i have connection problem trying to connect to my nifi instance .. from the groovy script

Connection to http://XXXXXXX:8080 refused

and my nifi logs records this line ...

2016-01-20 10:16:16,915 INFO [NiFi Web Server-79] org.apache.nifi.web.filter.RequestLogger Attempting request for (anonymous) GET http://XXXXXXX:8080/nifi-api/controller/status (source ip: YYYYYYY)

should i be authentified ( not as anonymous) to make the script authorized to connect ?

Philippe

Best regards

Explorer

it works now thanks !:-)

My Save file processor was not started so it was generating a conflict

Explorer

Great article very informative.But in my situation if there are multiple target directories to be created in runtime. Is it possible to generate using a single PutFile processor?How do I handle that/create multiple putFile processor at runtime?

Expert Contributor

Hi @Andrew Grande

is the nifi-api-deploy works fine for nifi-1.0.0?

I was trying it got this error - can you pls suggest what's wrong. Thanks.

2016/12/02 10:57:45:199 UTC [DEBUG] BasicClientConnectionManager - Get connection for route {}->http://localhost:8080 2016/12/02 10:57:45:219 UTC [DEBUG] DefaultClientConnectionOperator - Connecting to localhost:8080 2016/12/02 10:57:45:249 UTC [DEBUG] RequestAddCookies - CookieSpec selected: best-match 2016/12/02 10:57:45:265 UTC [DEBUG] RequestAuthCache - Auth cache not set in the context 2016/12/02 10:57:45:265 UTC [DEBUG] RequestTargetAuthentication - Target auth state: UNCHALLENGED 2016/12/02 10:57:45:266 UTC [DEBUG] RequestProxyAuthentication - Proxy auth state: UNCHALLENGED 2016/12/02 10:57:45:266 UTC [DEBUG] DefaultHttpClient - Attempt 1 to execute request 2016/12/02 10:57:45:267 UTC [DEBUG] DefaultClientConnection - Sending request: GET /nifi-api/controller/process-groups/root/process-group-references HTTP/1.1 2016/12/02 10:57:45:267 UTC [DEBUG] wire - >> "GET /nifi-api/controller/process-groups/root/process-group-references HTTP/1.1[\r][\n]" 2016/12/02 10:57:45:269 UTC [DEBUG] wire - >> "Accept: */*[\r][\n]" 2016/12/02 10:57:45:269 UTC [DEBUG] wire - >> "Host: localhost:8080[\r][\n]" 2016/12/02 10:57:45:269 UTC [DEBUG] wire - >> "Connection: Keep-Alive[\r][\n]" 2016/12/02 10:57:45:269 UTC [DEBUG] wire - >> "[\r][\n]" 2016/12/02 10:57:45:269 UTC [DEBUG] headers - >> GET /nifi-api/controller/process-groups/root/process-group-references HTTP/1.1 2016/12/02 10:57:45:269 UTC [DEBUG] headers - >> Accept: */* 2016/12/02 10:57:45:269 UTC [DEBUG] headers - >> Host: localhost:8080 2016/12/02 10:57:45:269 UTC [DEBUG] headers - >> Connection: Keep-Alive 2016/12/02 10:57:45:271 UTC [DEBUG] wire - << "HTTP/1.1 404 Not Found[\r][\n]" 2016/12/02 10:57:45:273 UTC [

[ERROR] HTTP call failed. Status code: HTTP/1.1 404 Not Found: The specified resource could not be found. Caught: java.lang.AssertionError: Terminated script execution. Expression: null java.lang.AssertionError: Terminated script execution. Expression: null

Expert Contributor

Hi @Andrew Grande

I tried to run the reconfigure.groovy - however the search api can't get anything, I tried name, uuid

Caught: groovyx.net.http.HttpResponseException: Not Found groovyx.net.http.HttpResponseException: Not Found at groovyx.net.http.RESTClient.defaultFailureHandler(RESTClient.java:263) at groovyx.net.http.HTTPBuilder$1.handleResponse(HTTPBuilder.java:503) at org.apache.http.impl.client.AbstractHttpClient.execute(AbstractHttpClient.java:1070) at org.apache.http.impl.client.AbstractHttpClient.execute(AbstractHttpClient.java:1044) at groovyx.net.http.HTTPBuilder.doRequest(HTTPBuilder.java:515)

Explorer

Seems like the controller/templates API is deprecated. Can you confirm? What is the alternate way ?

One question.. Does the ID of a Processor or ProcessFlow change if NiFi is rebooted?

Don't have an account?
Coming from Hortonworks? Activate your account here
Version history
Revision #:
2 of 2
Last update:
‎08-17-2019 01:49 PM
Updated by:
 
Contributors
Top Kudoed Authors