Member since
11-16-2015
911
Posts
671
Kudos Received
249
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 1308 | 09-30-2025 05:23 AM | |
| 1727 | 06-26-2025 01:21 PM | |
| 1568 | 06-19-2025 02:48 PM | |
| 1645 | 05-30-2025 01:53 PM | |
| 14544 | 02-22-2024 12:38 PM |
12-09-2017
06:21 PM
This is great thanks! Also, before too long there should be a NiFi processor and controller service to help with some of the session management (NIFI-4683).
... View more
11-16-2017
02:17 PM
1 Kudo
Thanks very much! I hope to write another series for InvokeScriptedProcessor, ScriptedReportingTask, ScriptedReader, and ScriptedRecordSetWriter someday 🙂
... View more
11-01-2017
01:13 PM
This feature was added in NiFi 1.4.0 (NIFI-4257)
... View more
10-25-2017
08:57 PM
6 Kudos
Some NiFi Expression Language (EL) expressions can be fairly complex, or used in a large flow, or both. These can make it difficult to test an EL expression on a running NiFi system. Although an excellent feature of NiFi is being able to adapt the flow while the system is running, it may not be prudent to stop a downstream processor, reroute a connection to something like UpdateAttribute, then list the queue in order to see attributes, content, etc. To make EL testing easier, I wrote a Groovy script called testEL.groovy that uses the same EL library that NiFi does, so all functions present in the specified NiFi version are available to the test tool. The following is the usage: usage: groovy testEL.groovy [options] [expressions]
Options:
-D <attribute=value> set value for given attribute
-help print this message As an example, the following tests an expression that appends "_world" to the "filename" attribute: > groovy testEL.groovy -D filename=hello '${filename:append("_world")}'
hello_world Note that it accepts multiple attribute definitions and multiple expressions, so you can test more than one expression using a single set of attributes: > groovy testEL.groovy -D filename=hello -D size=10 '${filename:append("_world")}' '${filename:prepend("I say "):append(" ${size} times")}'
hello_world
I say hello 10 times In order to attach testelgroovy.txt to this post, I had to add a .txt extension (and it lowercased the name), simply rename it before running the above. Hopefully you find this script helpful, if you try it please let me know how/if it works for you, and as always I welcome any questions, comments and suggestions on how to make things better 🙂 Cheers!
... View more
Labels:
08-24-2017
06:25 PM
In QueryDatabaseTable, you'd set the Maximum-Value Column to "id" and add a dynamic property named "initial.maxvalue.id" to 50. Make sure state has been cleared before running, and the first time it executes, it will grab all rows with id > 50. This same capability for GenerateTableFetch is not yet available (NIFI-4283) but coming soon.
... View more
01-12-2017
08:36 PM
15 Kudos
This article describes various "recipes" on how to accomplish certain tasks with the NiFi processor ExecuteScript, with examples given in Groovy, Jython, Javascript (Nashorn), and JRuby. This is Part 3 in the series, I will be discussing advanced features such as dynamic properties, modules, state management, and accessing/using Controller Services.
Part 1 - Introduction to the NiFi API and FlowFiles
Getting a flow file from an incoming queue
Creating new flow files
Working with flow file attributes
Transferring a flow file
Logging
Part 2 - FlowFile I/O and Error Handling
Reading from a flow file
Writing to a flow file
Reading and writing to/from a flow file
Error Handling
Part 3 - Advanced Features
Using Dynamic Properties
Adding Modules
State Management
Accessing Controller Services
Advanced Features
The first two articles in the series covered the basics of flow file operations, such as reading/writing attributes and content, as well as retrieving and transferring flow files using the "session" variable (a ProcessSession object). There are many more capabilities available to ExecuteScript; I will talk about a few of them here.
Dynamic Properties
One such capability is the concept of dynamic properties, also called User-Defined properties. These are properties for a processor for which a user can set both the property name and value. Not all processors support/use dynamic properties, but ExecuteScript will pass dynamic properties as variables which reference a PropertyValue object corresponding to the property's value. There are two important things to note here:
Because the property name is bound as-is to a variable name, the naming convention for dynamic properties must be supported for the specified programming language. For example, Groovy does not support period (.) as a valid variable character, so a dynamic property such as "my.value" will cause the processor to fail. A valid alternative in this case is "myValue".
The PropertyValue object is used (rather than a String representation of the value) to allow the script to perform various operations on the property's value before evaluating it to a String. If the property is known to contain a literal value, you can call the getValue() method on the variable to get its String representation. If instead the value could contain Expression Language or you want to cast the value to something other than String (such as the value 'true' to a Boolean object), there are methods for these operations too. These examples are illustrated in the recipes below, assuming we have two properties 'myProperty1' and 'myProperty2' defined as such:
Recipe: Get the value of a dynamic property
Use Case: A user has entered a dynamic property for use in the script (configuration parameter, e.g.).
Approach: Use the getValue() method from the variable's PropertyValue object. This method returns a String representation of the value of the dynamic property. Note that if Expression Language is present in the value, getValue() will not evaluate it (see the following recipe for that functionality).
Examples:
Groovy
def myValue1 = myProperty1.value
Jython
myValue1 = myProperty1.getValue()
Javascript
var myValue1 = myProperty1.getValue()
JRuby
myValue1 = myProperty1.getValue()
Recipe: Get the value of a dynamic property after evaluating Expression Language constructs
Use Case: A user has entered a dynamic property for use in the script (configuration parameter, e.g.), and it refers to attribute(s) in an incoming flow file.
Approach: Use the evaluateAttributeExpressions(flowFile) method from the variable's PropertyValue object. This method, followed by getValue(), returns a String representation of the value of the dynamic property after any Expression Language constructs have been evaluated. If a flow file is not available, but variables have been defined in the environment or Variable Registry, you can use evaluateAttributeExpressions() without a parameter
Examples:
Groovy
def myValue1 = myProperty1.value
def myValue2 = myProperty2.evaluateAttributeExpressions(flowFile).value
Jython
myValue1 = myProperty1.getValue()myValue2 = myProperty2.evaluateAttributeExpressions(flowFile).getValue()
Javascript
var myValue1 = myProperty1.getValue()var myValue2 = myProperty2.evaluateAttributeExpressions(flowFile).getValue()
JRuby
myValue1 = myProperty1.getValue()myValue2 = myProperty2.evaluateAttributeExpressions(flowFile).getValue()
Adding Modules
Another feature of ExecuteScript is the ability to add external "modules" to the classpath, which allows you to leverage various third-party libraries, scripts, etc. However each of the script engines handles the concept of modules differently, so I will discuss them separately. In general though, there are two types of modules, Java libraries (JARs) and scripts (written in the same language as the one in ExecuteScript. Here is how the various script engines handle these:
Groovy
The Groovy script engine (at least for use in ExecuteScript) does not support the importing of other Groovy scripts, instead it allows for JARs to be added to its classpath. So for external Groovy projects, consider compiling into bytecode and pointing at the classes folder or packaging into a JAR.
When using Groovy, the Module Directory property can be set to a comma-separated list of files (JARs) and folders. If a folder is specified, ExecuteScript will find any JARs in that folder and add those as well. This allows you to include third-party software that consists of a large number of JARs. For an example in Groovy, see this blog post.
Jython
The Jython script engine (at least for use in ExecuteScript) currently only supports the importing of pure Python modules, not natively-compiled modules (CPython, e.g.) such as numpy or scipy. It also does not currently support JARs, although this may change in an upcoming release. See this HCC post for more details. Under the hood, the entries in the Module Directory property are prepended to your script before execution, using "import sys" followed by "sys.path.append" for each of the specified module locations.
If you have Python installed, you can make use of all its installed pure Python modules by adding its site-packages folder to the Module Directory property, such as
/usr/local/lib/python2.7/site-packages
Then in your script you can import from the various packages, such as
from user_agents import parse
Javascript
The Javascript script engine (at least for use in ExecuteScript), allows for the same kinds of JARs/folders as the Groovy engine. It looks for JARs and if a folder is specified, it also looks in that folder for JARs.
JRuby
The JRuby script engine (at least for use in ExecuteScript) currently only allows for the specification of individual JARs, if a folder is specified it must have classes in it (same as the java compiler would want to see it), if the folder contains JARs they will not be automatically picked up. Also currently, no pure Ruby modules can be imported.
I intend to improve on all the engines in the future, to give a more powerful yet consistent user experience.
State Management
NiFi (as of 0.5.0 I believe) offers the capability for Processors and other NiFi components to persist some information to enable some stateful functionality around the component. For example, the QueryDatabaseTable processor keeps track of the largest values it has seen for the specified columns, such that the next time it runs, it will only fetch rows whose values are larger than those that have been seen so far (i.e. stored in the State Manager).
An important concept in terms of state management is Scope. NiFi components can choose to store their state at the cluster level or the local level. Note that in a standalone NiFi instance, "cluster scope" is identical to "local scope". The choice of scope is usually about whether in a flow, the same processor on each node can share the state data. If the instances across the cluster do not need to share state, then use local scope. In Java these options are provided as an enum called Scope, so when I refer to Scope.CLUSTER and Scope.LOCAL, I mean cluster and local scope respectively.
To employ state management features in ExecuteScript (language-specific examples are below), you get a reference to the StateManager by calling ProcessContext's getStateManager() method (recall that each engine gets a variable named "context" with an instance of ProcessContext). You can then call the following methods on the StateManager object:
void setState(Map<String, String> state, Scope scope) - Updates the value of the component's state at the given scope, setting it to given value. Notice that the value is a Map; the concept of "component state" is the Map of all key/value pairs that each constitute a lower-level of state. The Map is updated all at once to provide atomicity.
StateMap getState(Scope scope) - Returns the current state for the component at the given scope. This method never returns null; rather it is a StateMap object and if the state has not yet been set, the StateMap's version will be -1, and the map of values will be empty. Often a new Map<String,String> will be created to store the updated values, then setState() or replace() will be called.
boolean replace(StateMap oldValue, Map<String, String> newValue, Scope scope) - Updates the value of the component's state (at the given scope) to the new value if and only if the value currently is the same as the given oldValue. If the state was updated to the new value, true is returned; otherwise false is returned if the state's value was not equal to oldValue.
void clear(Scope scope) - Clears all keys and values from the component's state at the given scope.
Recipe: Get the current map of key/value pairs
Use Case: The script needs to get the current key/value pairs from the state manager for use in the script (update, e.g.).
Approach: Use the getStateManager() method from ProcessContext, then getStateMap() from the StateManager, then toMap() to convert to a Map<String,String> of key/value pairs. Note that StateMap also has a get(key) method for simply retrieving a value, but this is not often used as the Map is usually updated and must be set for the StateManager once this is done.
Examples:
Groovy
import org.apache.nifi.components.state.Scope
def oldMap = context.stateManager.getState(Scope.LOCAL).toMap()
Jython
from org.apache.nifi.components.state import Scope
oldMap = context.stateManager.getState(Scope.LOCAL).toMap()
Javascript
var Scope = Java.type('org.apache.nifi.components.state.Scope');
var oldMap = context.stateManager.getState(Scope.LOCAL).toMap();
JRuby
java_import org.apache.nifi.components.state.Scope
oldMap = context.stateManager.getState(Scope::LOCAL).toMap()
NOTE: Only the Scope class was referenced explicitly in the script, so it is the only one imported. If you refer to StateManager, StateMap, etc. you will need to import those classes as well.
Recipe: Update the map of key/value pairs
Use Case: The script wants to update the state map with a new Map of key/value pairs.
Approach: To get the current StateMap object, again use the getStateManager() method from ProcessContext, then getStateMap() from the StateManager. The examples assume a new Map, but using the above recipe (with the toMap() method), you could create a new Map using the existing values, then just update the desired entries. Note that replace() will not work if there is no current map (i.e. the StateMap.getVersion() returns -1), so the examples check and call setState() or replace() accordingly. When running from a new instance of ExecuteScript, the StateMap version will be -1, so after a single execution, if you right-click on the ExecuteScript processor and choose View State, you should see something like this:
Examples:
Groovy
import org.apache.nifi.components.state.Scope
def stateManager = context.stateManager
def stateMap = stateManager.getState(Scope.CLUSTER)
def newMap = ['myKey1': 'myValue1']
if (stateMap.version == -1) {
stateManager.setState(newMap, Scope.CLUSTER);
} else {
stateManager.replace(stateMap, newMap, Scope.CLUSTER);
}
Jython
from org.apache.nifi.components.state import Scope
stateManager = context.stateManager
stateMap = stateManager.getState(Scope.CLUSTER)
newMap = {'myKey1': 'myValue1'}
if stateMap.version == -1:
stateManager.setState(newMap, Scope.CLUSTER)
else:
stateManager.replace(stateMap, newMap, Scope.CLUSTER)
Javascript
var Scope = Java.type('org.apache.nifi.components.state.Scope');
var stateManager = context.stateManager;
var stateMap = stateManager.getState(Scope.CLUSTER);
var newMap = {'myKey1': 'myValue1'};
if (stateMap.version == -1) {
stateManager.setState(newMap, Scope.CLUSTER);
} else {
stateManager.replace(stateMap, newMap, Scope.CLUSTER);
}
JRuby
java_import org.apache.nifi.components.state.Scope
stateManager = context.stateManager
stateMap = stateManager.getState(Scope::CLUSTER)
newMap = {'myKey1'=> 'myValue1'}
if stateMap.version == -1
stateManager.setState(newMap, Scope::CLUSTER)
else
stateManager.replace(stateMap, newMap, Scope::CLUSTER)
end
Recipe: Clear the state map
Use Case: The script wants to clear all the key/value pairs for the state map.
Approach: Use the getStateManager() method from ProcessContext, then call StateManager's clear(scope) method.
Examples:
Groovy
import org.apache.nifi.components.state.Scope
context.stateManager.clear(Scope.LOCAL)
Jython
from org.apache.nifi.components.state import Scope
context.stateManager.clear(Scope.LOCAL)
Javascript
var Scope = Java.type('org.apache.nifi.components.state.Scope');
context.stateManager.clear(Scope.LOCAL);
JRuby
java_import org.apache.nifi.components.state.Scope
context.stateManager.clear(Scope::LOCAL)
Accessing Controller Services
In the NiFi ARchive (NAR) structure, Controller Services are exposed as interfaces, usually in an API JAR. For example, the DistributedCacheClient is an interface extending from ControllerService, and it resides in the nifi-distributed-cache-client-service-api JAR, which is located in the nifi-standard-services-api-nar NAR. Other NARs that wish to reference interfaces (to create a new type of client implementation, e.g.) must specify the nifi-standard-services-api-nar as its parent NAR, and it then refers to provided instances of the API JARs in the processor submodule.
That's a little more low-level detail than you need to be able to leverage Controller Services, but I mention it for two reasons:
Prior to NiFi 1.0.0, the scripting NAR (which includes ExecuteScript and InvokeScriptedProcessor) did not specify nifi-standard-services-api-nar as its parent. That meant that only implicit references could be used for ControllerServices interfaces (and their implementations), and for the same reason, only interface methods that do not require other unavailable classes can be used. This limits the usefulness of ExecuteScript in terms of using Controller Services, but for a working example you can read my blog post.
As of NiFi 1.0.0, the scripting processors do have access to some of the Controller Service interfaces (and associated classes) in the nifi-standard-services-api-nar. These include DBCPService, DistributedMapCacheClient, DistributedSetCacheClient, HttpContextMap and SSLContextService. However I don't believe the other APIs included in the nifi-standard-services-api-nar will be available, and no custom ControllerService interfaces will be recognized. Having said that, you may be able to use the approach from the blog post in #1 to do so.
Processors that always intend on using a Controller Service instance create a property (i.e. PropertyDescriptor object) and call identifiesControllerService(class) on it. When the UI component is rendered, it will find all the Controller Services that implement the desired interface, and under-the-hood the component's ID is used, but its friendly name is displayed to the user.
For ExecuteScript, we can let the user choose a Controller Service instance by letting them specify the name or ID. If we allow the user to specify the name, then the script will have to perform a lookup trying to match the name to a (single!) element in the list of Controller Service instances of that type. That technique is described in the blog post above, so I won't cover it here. If the user enters the ID of the instance, then (as of NiFi 1.0.0) it is much easier to get access to the object as we will see below. The examples will use a DistributedMapCacheClientService instance called "distMapClient", connecting to a DistributedMapCacheServer instance (with standard defaults, localhost:4557), where the client instance has an ID of 93db6734-0159-1000-b46f-78a8af3b69ed:
In the ExecuteScript config, a dynamic property is created, called "clientServiceId" and set to 93db6734-0159-1000-b46f-78a8af3b69ed:
We can then use clientServiceId.asControllerService(DistributedMapCacheClient), where the parameter is a reference to the Class object for DistributedMapCacheClient. For the examples, I have pre-populated the cache with a String key 'a' set to the String value 'hello'. For a handy Groovy script to work with a DistributedMapCacheServer, check out my article here.
Once we have a DistributedMapCacheClient instance, then to retrieve a value we can call its get(key, serializer, deserializer) method. In our case since the keys and values are Strings, we just need an instance of Serializer<String> and Deserializer<String> to pass into the get() method. The approach for all the languages is similar to the creation of the StreamCallback instances described in Part 2 of this article series. The examples will get the value for key 'a' from the pre-populated server and log the result ("Result = hello")
Recipe: Get the value of a property stored in a DistributedMapCacheServer
Use Case: A user has populated values into a DistributedMapCacheServer (configuration data, e.g.) and the script wants access to them.
Approach: Use the approach described above, creating a StringSerializer and StringDeserializer object, then getting the DistributedMapCacheClientService instance by ID, then calling get() on the service. Logging of the result is included here for convenience.
Examples:
Groovy
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient
import org.apache.nifi.distributed.cache.client.Serializer
import org.apache.nifi.distributed.cache.client.Deserializer
import java.nio.charset.StandardCharsets
def StringSerializer = {value, out -> out.write(value.getBytes(StandardCharsets.UTF_8))} as Serializer<String>
def StringDeserializer = { bytes -> new String(bytes) } as Deserializer<String>
def myDistClient = clientServiceId.asControllerService(DistributedMapCacheClient)
def result = myDistClient.get('a', StringSerializer, StringDeserializer)
log.info("Result = $result")
Jython
from org.python.core.util import StringUtil
from org.apache.nifi.distributed.cache.client import DistributedMapCacheClient, Serializer, Deserializer
# Define a subclass of Serializer for use in the client's get() method
class StringSerializer(Serializer):
def __init__(self):
pass
def serialize(self, value, out):
out.write(value)
# Define a subclass of Deserializer for use in the client's get() method
class StringDeserializer(Deserializer):
def __init__(self):
pass
def deserialize(self, bytes):
return StringUtil.fromBytes(bytes)
myDistClient = clientServiceId.asControllerService(DistributedMapCacheClient)
result = myDistClient.get('a', StringSerializer(), StringDeserializer())
log.info('Result = ' + str(result))
Javascript
var DistributedMapCacheClient = Java.type('org.apache.nifi.distributed.cache.client.DistributedMapCacheClient');
var Serializer = Java.type('org.apache.nifi.distributed.cache.client.Serializer');
var Deserializer = Java.type('org.apache.nifi.distributed.cache.client.Deserializer');
var StandardCharsets = Java.type('java.nio.charset.StandardCharsets');
var StringSerializer = new Serializer(function(value, out) {
out.write(value.getBytes(StandardCharsets.UTF_8));
})
var StringDeserializer = new Deserializer(function(arr) {
// For some reason I had to build a string from the character codes in the "arr" array
var s = "";
for(var i = 0; i < arr.length; i++) {
s = s + String.fromCharCode(arr[i]);
}
return s;
})
var myDistClient = clientServiceId.asControllerService(DistributedMapCacheClient.class);
var result = myDistClient.get('a', StringSerializer, StringDeserializer);
log.info("Result = "+ result);
JRuby
java_import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient
java_import org.apache.nifi.distributed.cache.client.Serializer
java_import org.apache.nifi.distributed.cache.client.Deserializer
java_import java.nio.charset.StandardCharsets
# Define a subclass of Serializer for use in the client's get() method
class StringSerializer
include Serializer
def serialize(value, out)
out.write(value.to_java.getBytes(StandardCharsets::UTF_8))
end
end
# Define a subclass of Deserializer for use in the client's get() method
class StringDeserializer
include Deserializer
def deserialize(bytes)
bytes.to_s
end
end
myDistClient = clientServiceId.asControllerService(DistributedMapCacheClient.java_class)
result = myDistClient.get('a', StringSerializer.new, StringDeserializer.new)
log.info('Result = ' + result)
This article contains much more complex examples of how to interact with the NiFi API using the various supported languages. I may add other parts to this series, or certainly other articles about the scripting processors, as improvements are made and additional features are added (such as the ScriptedReportingTask coming soon!). As always I welcome all questions, comments, and suggestions. Cheers!
... View more
Labels:
12-31-2016
05:49 PM
19 Kudos
This article describes various "recipes" on how to accomplish certain tasks with the NiFi processor ExecuteScript, with examples given in Groovy, Jython, Javascript (Nashorn), and JRuby. This is Part 2 in the series, I will be discussing reading from and writing to flow file contents, as well as error handling.
Part 1 - Introduction to the NiFi API and FlowFiles
Getting a flow file from an incoming queue
Creating new flow files
Working with flow file attributes
Transferring a flow file
Logging
Part 2 - FlowFile I/O and Error Handling
Reading from a flow file
Writing to a flow file
Reading and writing to/from a flow file
Error Handling
Part 3 - Advanced Features
Using Dynamic Properties
Adding Modules
State Management
Accessing Controller Services
Introduction to FlowFile I/O
Flow files in NiFi are made of two major components, attributes and content. Attributes are metadata about the content / flow file, and we saw how to manipulate them using ExecuteScript in Part 1 of this series. The content of a flow file is, at its heart, simply a collection of bytes and has no inherent structure, schema, format, etc. Various NiFi processors assume the incoming flow files have a particular schema/format (or determine it from attributes such as "mime.type" or infer it in other ways). These processors can then act upon the content based on the assumption that the files really do have that format (and will often transfer to a "failure" relationship if they do not). Also processors may output flow files in a specified format, this is described in the processors' descriptions in the NiFi documentation.
Input and Output (I/O) for the contents of flow files is provided via the ProcessSession API and thus the "session" variable for ExecuteScript (see Part 1 for more information). One mechanism for this is to pass a callback object into a call to session.read() or session.write(). An InputStream and/or OutputStream will be created for the FlowFile object, and the callback object will be invoked using the corresponding callback interface, with the InputStream and/or OutputStream references passed in for use by the callback. There are three main callback interfaces, each with its own use case:
InputStreamCallback
This interface is used by the session.read( flowFile, inputStreamCallback) method to provide an InputStream from which to read the contents of the flow file. The interface has a single method:
void process(InputStream in) throws IOException
This interface provides a managed input stream for use. The input stream is automatically opened and closed though it is ok to close the stream manually. This is the form you would use if you are only reading from a particular flow file, and not writing back out to it.
An example is when you want to process an incoming flow file, but create many output flow files, such as the SplitText processor does.
OutputStreamCallback
This interface is used by the session.write( flowFile, outputStreamCallback) method to provide an OutputStream to which to write the contents of the flow file. The interface has a single method:
void process(OutputStream out) throws IOException
This interface provides a managed output stream for use. The output stream is automatically opened and closed though it is ok to close the stream manually - and quite important if any streams wrapping these streams open resources which should be cleared.
An example is when ExecuteScript will be generating data, either from within or from an external file, but not a flow file. Then you would use session.create() to create a new FlowFile, then session.write(flowFile, outputStreamCallback) to insert content.
StreamCallback
This interface is used by the session.write(flowFile, streamCallback) method to provide an InputStream and OutputStream, from which to read from and/or write to the contents of the flow file. The interface has a single method:
void process(InputStream in, OutputStream out) throws IOException
This interface provides managed input and output streams for use. The input stream is automatically opened and closed though it is ok to close the streams manually - and quite important if any streams wrapping these streams open resources which should be cleared.
An example is when you want to process an incoming flow file and overwrite its contents with something new, such as the EncryptContent processor does.
Since these callbacks are Java objects, the script will have to create one and pass it into the session method(s), the recipes will illustrate this for the various scripting languages. Also there are other methods of reading from and writing to flow files, which include:
Using session.read(flowFile) to return an InputStream. This alleviates the need for an InputStreamCallback, instead it returns an InputStream that you can read from. In exchange you must manage (close, e.g.) the InputStream manually.
Using session.importFrom(inputStream, flowFile) to write from an InputStream to a FlowFile. This replaces the need for a session.write() with an OutputStreamCallback passed in.
Now, on to the recipes 🙂
Recipes
Recipe: Read the contents of an incoming flow file using a callback
Use Case: You have incoming connection(s) to ExecuteScript and want to retrieve the contents of a flow file from the queue(s) for processing.
Approach: Use the read(flowFile, inputStreamCallback) method from the session object. An InputStreamCallback object is needed to pass into the read() method. Note that because InputStreamCallback is an object, the contents are only visible to that object by default. If you need to use the data outside the read() method, use a more globally-scoped variable. The examples will store the full contents of the incoming flow file into a String (using Apache Commons' IOUtils class). NOTE: For large flow files, this is not the best technique; rather you should read in only as much data as you need, and process that as appropriate. For something like SplitText, you could read in a line at a time and process it within the InputStreamCallback, or use the session.read(flowFile) approach mentioned earlier to get an InputStream reference to use outside of a callback.
Examples:
Groovy
import org.apache.commons.io.IOUtils
import java.nio.charset.StandardCharsets
flowFile = session.get()
if(!flowFile)return
def text = ''
// Cast a closure with an inputStream parameter to InputStreamCallback
session.read(flowFile, {inputStream ->
text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
// Do something with text here
} as InputStreamCallback)
Jython
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import InputStreamCallback
# Define a subclass of InputStreamCallback for use in session.read()
class PyInputStreamCallback(InputStreamCallback):
def __init__(self):
pass
def process(self, inputStream):
text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
# Do something with text here
# end class
flowFile = session.get()
if(flowFile != None):
session.read(flowFile, PyInputStreamCallback())
# implicit return at the end
Javascript
var InputStreamCallback = Java.type("org.apache.nifi.processor.io.InputStreamCallback")
var IOUtils = Java.type("org.apache.commons.io.IOUtils")
var StandardCharsets = Java.type("java.nio.charset.StandardCharsets")
var flowFile = session.get();
if(flowFile != null) {
// Create a new InputStreamCallback, passing in a function to define the interface method
session.read(flowFile,
new InputStreamCallback(function(inputStream) {
var text = IOUtils.toString(inputStream, StandardCharsets.UTF_8);
// Do something with text here
}));
}
JRuby
java_import org.apache.commons.io.IOUtils
java_import org.apache.nifi.processor.io.InputStreamCallback
# Define a subclass of InputStreamCallback for use in session.read()
class JRubyInputStreamCallback
include InputStreamCallback
def process(inputStream)
text = IOUtils.toString(inputStream)
# Do something with text here
end
end
jrubyInputStreamCallback = JRubyInputStreamCallback.new
flowFile = session.get()
if flowFile != nil
session.read(flowFile, jrubyInputStreamCallback)
end
Recipe: Write content to an outgoing flow file using a callback
Use Case: You want to generate content for an outgoing flow file.
Approach: Use the write(flowFile, outputStreamCallback) method from the session object. An OutputStreamCallback object is needed to pass into the write() method. Note that because OutputStreamCallback is an object, the contents are only visible to that object by default. If you need to use the data outside the write() method, use a more globally-scoped variable. The examples will write a sample String to a flowFile.
Examples:
Groovy
import org.apache.commons.io.IOUtils
import java.nio.charset.StandardCharsets
flowFile = session.get()
if(!flowFile) return
def text = 'Hello world!'
// Cast a closure with an outputStream parameter to OutputStreamCallback
flowFile = session.write(flowFile, {outputStream ->
outputStream.write(text.getBytes(StandardCharsets.UTF_8))
} as OutputStreamCallback)
Jython
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import OutputStreamCallback
# Define a subclass of OutputStreamCallback for use in session.write()
class PyOutputStreamCallback(OutputStreamCallback):
def __init__(self):
pass
def process(self, outputStream):
outputStream.write(bytearray('Hello World!'.encode('utf-8')))
# end class
flowFile = session.get()
if(flowFile != None):
flowFile = session.write(flowFile, PyOutputStreamCallback())
# implicit return at the end
Javascript
var OutputStreamCallback = Java.type("org.apache.nifi.processor.io.OutputStreamCallback");
var IOUtils = Java.type("org.apache.commons.io.IOUtils");
var StandardCharsets = Java.type("java.nio.charset.StandardCharsets");
var flowFile = session.get();
if(flowFile != null) {
// Create a new OutputStreamCallback, passing in a function to define the interface method
flowFile = session.write(flowFile,
new OutputStreamCallback(function(outputStream) {
outputStream.write("Hello World!".getBytes(StandardCharsets.UTF_8))
}));
}
JRuby
java_import org.apache.commons.io.IOUtils
java_import java.nio.charset.StandardCharsets
java_import org.apache.nifi.processor.io.OutputStreamCallback
# Define a subclass of OutputStreamCallback for use in session.write()
class JRubyOutputStreamCallback
include OutputStreamCallback
def process(outputStream)
outputStream.write("Hello World!".to_java.getBytes(StandardCharsets::UTF_8))
end
end
jrubyOutputStreamCallback = JRubyOutputStreamCallback.new
flowFile = session.get()
if flowFile != nil
flowFile = session.write(flowFile, jrubyOutputStreamCallback)
end
Recipe: Overwrite an incoming flow file with updated content using a callback
Use Case: You want to reuse the incoming flow file but want to modify its content for the outgoing flow file.
Approach: Use the write(flowFile, streamCallback) method from the session object. An StreamCallback object is needed to pass into the write() method. StreamCallback provides both an InputStream (from the incoming flow file) and an outputStream (for the next version of that flow file), so you can use the InputStream to get the current contents of the flow file, then modify them and write them back out to the flow file. This overwrites the contents of the flow file, so for append you'd have to handle that by appending to the read-in contents, or use a different approach (with session.append() rather than session.write() ). Note that because StreamCallback is an object, the contents are only visible to that object by default. If you need to use the data outside the write() method, use a more globally-scoped variable. The examples will reverse the contents of the incoming flowFile (assumed to be a String) and write out the reversed string to a new version of the flowFile.
Examples:
Groovy
import org.apache.commons.io.IOUtils
import java.nio.charset.StandardCharsets
flowFile = session.get()
if(!flowFile) return
def text = 'Hello world!'
// Cast a closure with an inputStream and outputStream parameter to StreamCallback
flowFile = session.write(flowFile, {inputStream, outputStream ->
text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
outputStream.write(text.reverse().getBytes(StandardCharsets.UTF_8))
} as StreamCallback)
session.transfer(flowFile, REL_SUCCESS)
Jython
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
# Define a subclass of StreamCallback for use in session.write()
class PyStreamCallback(StreamCallback):
def __init__(self):
pass
def process(self, inputStream, outputStream):
text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
outputStream.write(bytearray('Hello World!'[::-1].encode('utf-8')))
# end class
flowFile = session.get()
if(flowFile != None):
flowFile = session.write(flowFile, PyStreamCallback())
# implicit return at the end
Javascript
var StreamCallback = Java.type("org.apache.nifi.processor.io.StreamCallback");
var IOUtils = Java.type("org.apache.commons.io.IOUtils");
var StandardCharsets = Java.type("java.nio.charset.StandardCharsets");
var flowFile = session.get();
if(flowFile != null) {
// Create a new StreamCallback, passing in a function to define the interface method
flowFile = session.write(flowFile,
new StreamCallback(function(inputStream, outputStream) {
var text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
outputStream.write(text.split("").reverse().join("").getBytes(StandardCharsets.UTF_8))
}));
}
JRuby
java_import org.apache.commons.io.IOUtils
java_import java.nio.charset.StandardCharsets
java_import org.apache.nifi.processor.io.StreamCallback
# Define a subclass of StreamCallback for use in session.write()
class JRubyStreamCallback
include StreamCallback
def process(inputStream, outputStream)
text = IOUtils.toString(inputStream)
outputStream.write((text.reverse!).to_java.getBytes(StandardCharsets::UTF_8))
end
end
jrubyStreamCallback = JRubyStreamCallback.new
flowFile = session.get()
if flowFile != nil
flowFile = session.write(flowFile, jrubyStreamCallback)
end
Recipe: Handle errors during script processing
Use Case: An error occurs in the script (either by data validation or a thrown exception), and you want the script to handle it gracefully.
Approach: For exceptions, use the exception-handling mechanism for the scripting language (often they are try/catch block(s)). For data validation, you can use a similar approach, but define a boolean variable like "valid" and an if/else clause rather than a try/catch clause. ExecuteScript defines "success" and "failure" relationships; often your processing will transfer "good" flow files to success and "bad" flow files to failure (logging an error in the latter case). It is possible
Examples:
Groovy
flowFile = session.get()
if(!flowFile) return
try {
// Something that might throw an exception here
// Last operation is transfer to success (failures handled in the catch block)
session.transfer(flowFile, REL_SUCCESS)
} catch(e) {
log.error('Something went wrong', e)
session.transfer(flowFile, REL_FAILURE)
}
Jython
flowFile = session.get()
if(flowFile != None):
try:
# Something that might throw an exception here
# Last operation is transfer to success (failures handled in the catch block)
session.transfer(flowFile, REL_SUCCESS)
except:
log.error('Something went wrong', e)
session.transfer(flowFile, REL_FAILURE)
# implicit return at the end
Javascript
var flowFile = session.get();
if(flowFile != null) {
try {
// Something that might throw an exception here
// Last operation is transfer to success (failures handled in the catch block)
session.transfer(flowFile, REL_SUCCESS)
} catch(e) {
log.error('Something went wrong', e)
session.transfer(flowFile, REL_FAILURE)
}
}
JRuby
flowFile = session.get()
if flowFile != nil
begin
# Something that might raise an exception here
# Last operation is transfer to success (failures handled in the rescue block)
session.transfer(flowFile, REL_SUCCESS)
rescue Exception => e
log.error('Something went wrong', e)
session.transfer(flowFile, REL_FAILURE)
end
end
Hopefully this article has described the basics of FlowFile I/O and error handling, but suggestions and improvements are always welcome! In the next article in the series, I will discuss some more advanced features such as dynamic properties, modules, state management, and accessing/using Controller Services. Until then, cheers!
... View more
Labels:
12-28-2016
07:26 PM
30 Kudos
This article describes various "recipes" on how to accomplish certain tasks with the NiFi processor ExecuteScript, with examples given in Groovy, Jython, Javascript (Nashorn), and JRuby. Recipes in this article series include:
Part 1 - Introduction to the NiFi API and FlowFiles
Getting a flow file from an incoming queue
Creating new flow files
Working with flow file attributes
Transferring a flow file
Logging
Part 2 - FlowFile I/O and Error Handling
Reading from a flow file
Writing to a flow file
Reading and writing to/from a flow file
Error Handling
Part 3 - Advanced Features
Using Dynamic Properties
Adding Modules
State Management
Accessing Controller Services
Introduction
ExecuteScript is a versatile processor that allows the user to code custom logic in a programming language that will be executed each time the ExecuteScript processor is triggered. The following variable bindings are provided to the script to enable access to NiFi components:
session: This is a reference to the ProcessSession assigned to the processor. The session allows you to perform operations on flow files such as create(), putAttribute(), and transfer(), as well as read() and write().
context: This is a reference to the ProcessContext for the processor. It can be used to retrieve processor properties, relationships, Controller Services, and the StateManager.
log: This is a reference to the ComponentLog for the processor. Use it to log messages to NiFi, such as log.info('Hello world!')
REL_SUCCESS: This is a reference to the "success" relationship defined for the processor. It could also be inherited by referencing the static member of the parent class (ExecuteScript), but some engines such as Lua do not allow for referencing static members, so this is a convenience variable. It also saves having to use the fully-qualified name for the relationship.
REL_FAILURE: This is a reference to the "failure" relationship defined for the processor. As with REL_SUCCESS, it could also be inherited by referencing the static member of the parent class (ExecuteScript), but some engines such as Lua do not allow for referencing static members, so this is a convenience variable. It also saves having to use the fully-qualified name for the relationship.
Dynamic Properties: Any dynamic properties defined in ExecuteScript are passed to the script engine as variables set to the PropertyValue object corresponding to the dynamic property. This allows you to get the String value of the property, but also to evaluate the property with respect to NiFi Expression Language, cast the value as an appropriate data type (such as Boolean, e.g.), etc. Because the dynamic property name becomes the variable name for the script, you must be aware of the variable naming properties for the chosen script engine. For example, Groovy does not allow periods (.) in variable names, so an error will occur if "my.property" was a dynamic property name.
Interaction with these variables is done via the NiFi Java API, each recipe below will discuss the relevant API calls as they are introduced. The recipes in the following section perform various functions on flow files, such as reading/writing attributes, transferring to a relationship, logging, etc. Please note that the examples are snippets and do not run as-is. For example, if a flow file has been retrieved from the queue with session.get(), it must be transferred to a relationship or removed, or else an error will occur. The snippets are meant to be plain and clear to illustrate only the concept(s) presented, without the addition of boilerplate code to make them working examples. In a later article I will put them all together to show full working scripts that perform useful tasks.
Recipes
Recipe: Get an incoming flow file from the session
Use Case: You have incoming connection(s) to ExecuteScript and want to retrieve one flow file from the queue(s) for processing.
Approach: Use the get() method from the session object. This method returns the FlowFile that is next highest priority FlowFile to process. If there is no FlowFile to process, the method will return null. Note that it is possible to have null returned even if there is a steady flow of FlowFiles into the processor. This can happen if there are multiple concurrent tasks for the processor, and the other task(s) have already retrieved the FlowFiles. If the script requires a FlowFile to continue processing, then it should immediately return if null is returned from session.get()
Examples:
Groovy
flowFile = session.get()
if(!flowFile) return
Jython
flowFile = session.get()
if (flowFile != None):
# All processing code starts at this indent
# implicit return at the end
Javascript
var flowFile = session.get();
if (flowFile != null) {
// All processing code goes here
}
JRuby
flowFile = session.get()
if flowFile != nil
# All processing code goes here
end
Recipe: Get multiple incoming flow files from the session
Use Case: You have incoming connection(s) to ExecuteScript and want to retrieve multiple flow files from the queue(s) for processing.
Approach: Use the get(maxResults) method from the session object. This method returns up to maxResults FlowFiles from the work queue. If no FlowFiles are available, an empty list is returned (the method does not return null). NOTE: If multiple incoming queues are present, the behavior is unspecified in terms of whether all queues or only a single queue will be polled in a single call. Having said that, the observed behavior (for both NiFi 1.1.0+ and before) is described here.
Examples:
Groovy
flowFileList = session.get(100)
if(!flowFileList.isEmpty()) {
flowFileList.each { flowFile ->
// Process each FlowFile here
}
}
Jython
flowFileList = session.get(100)
if not flowFileList.isEmpty():
for flowFile in flowFileList:
# Process each FlowFile here
Javascript
flowFileList = session.get(100)
if(!flowFileList.isEmpty()) {
for each (var flowFile in flowFileList) {
// Process each FlowFile here
}
}
JRuby
flowFileList = session.get(100)
if !(flowFileList.isEmpty())
flowFileList.each { |flowFile|
# Process each FlowFile here
}
end
Recipe: Create a new FlowFile
Use Case: You want to generate a new FlowFile to send to the next processor
Approach: Use the create() method from the session object. This method returns a new FlowFile object, which you can perform further processing on
Examples:
Groovy
flowFile = session.create()
// Additional processing here
Jython
flowFile = session.create()
# Additional processing here
Javascript
var flowFile = session.create();
// Additional processing here
JRuby
flowFile = session.create()
# Additional processing here
Recipe: Create a new FlowFile from a parent FlowFile
Use Case: You want to generate new FlowFile(s) based on an incoming FlowFile
Approach: Use the create(parentFlowFile) method from the session object. This method takes a parent FlowFile reference and returns a new child FlowFile object. The newly created FlowFile will inherit all of the parent's attributes except for the UUID. This method will automatically generate a Provenance FORK event or a Provenance JOIN event, depending on whether or not other FlowFiles are generated from the same parent before the ProcessSession is committed.
Examples:
Groovy
flowFile = session.get()
if(!flowFile) return
newFlowFile = session.create(flowFile)
// Additional processing here
Jython
flowFile = session.get()
if (flowFile != None):
newFlowFile = session.create(flowFile)
# Additional processing here
Javascript
var flowFile = session.get();
if (flowFile != null) {
var newFlowFile = session.create(flowFile);
// Additional processing here
}
JRuby
flowFile = session.get()
if flowFile != nil
newFlowFile = session.create(flowFile)
# Additional processing here
end
Recipe: Add an attribute to a flow file
Use Case: You have a flow file to which you'd like to add a custom attribute.
Approach: Use the putAttribute(flowFile, attributeKey, attributeValue) method from the session object. This method updates the given FlowFile's attributes with the given key/value pair. NOTE: The "uuid" attribute is fixed for a FlowFile and cannot be modified; if the key is named "uuid", it will be ignored.
Also this is a good point to mention that FlowFile objects are immutable; this means that if you update a FlowFile's attributes (or otherwise alter it) via the API, you will get a new reference to the new version of the FlowFile. This is very important when it comes to transferring FlowFiles to relationships. You must keep a reference to the latest version of a FlowFile, and you must transfer or remove the latest version of all FlowFiles retrieved from or created by the session, otherwise you will get an error when executing. Most often, the variable used to store a FlowFile reference will be overwritten with the latest version returned from a method that alters the FlowFile (intermediate FlowFile references will be automatically discarded). In these examples you will see this technique of reusing a flowFile reference when adding attributes. Note that the current reference to the FlowFile is passed into the putAttribute() method. The resulting FlowFile has an attribute named 'myAttr' with a value of 'myValue'. Also note that the method takes a String for the value; if you have an Object you will have to serialize it to a String. Finally, please note that if you are adding multiple attributes, it is better to create a Map and use putAllAttributes() instead (see next recipe for details).
Examples:
Groovy
flowFile = session.get()
if(!flowFile) return
flowFile = session.putAttribute(flowFile, 'myAttr', 'myValue')
Jython
flowFile = session.get()
if (flowFile != None):
flowFile = session.putAttribute(flowFile, 'myAttr', 'myValue')
# implicit return at the end
Javascript
var flowFile = session.get();
if (flowFile != null) {
flowFile = session.putAttribute(flowFile, 'myAttr', 'myValue')
}
JRuby
flowFile = session.get()
if flowFile != nil
flowFile = session.putAttribute(flowFile, 'myAttr', 'myValue')
end
Recipe: Add multiple attributes to a flow file
Use Case: You have a flow file to which you'd like to add custom attributes.
Approach: Use the putAllAttributes(flowFile, attributeMap) method from the session object. This method updates the given FlowFile's attributes with the key/value pairs from the given Map. NOTE: The "uuid" attribute is fixed for a FlowFile and cannot be modified; if the key is named "uuid", it will be ignored.
The technique here is to create a Map (aka dictionary in Jython, hash in JRuby) of the attribute key/value pairs you'd like to update, then call putAllAttributes() on it. This is much more efficient than calling putAttribute() for each key/value pair, as the latter case will cause the framework to create a temporary version of the FlowFile for each attribute added (see above recipe for discussion on FlowFile immutability). The examples show a map of two entries myAttr1 and myAttr2, set to '1' and the language-specific coercion of the number 2 as a String (to adhere to the method signature of requiring String values for both key and value). Note that a session.transfer() is not specified here (so the code snippets below do not work as-is), see the following recipe for that.
Examples:
Groovy
attrMap = ['myAttr1': '1', 'myAttr2': Integer.toString(2)]
flowFile = session.get()
if(!flowFile) return
flowFile = session.putAllAttributes(flowFile, attrMap)
Jython
attrMap = {'myAttr1':'1', 'myAttr2':str(2)}
flowFile = session.get()
if (flowFile != None):
flowFile = session.putAllAttributes(flowFile, attrMap)
# implicit return at the end
Javascript
var number2 = 2;
var attrMap = {'myAttr1':'1', 'myAttr2': number2.toString()}
var flowFile = session.get()
if (flowFile != null) {
flowFile = session.putAllAttributes(flowFile, attrMap)
}
JRuby
attrMap = {'myAttr1' => '1', 'myAttr2' => 2.to_s}
flowFile = session.get()
if flowFile != nil
flowFile = session.putAllAttributes(flowFile, attrMap)
end
Recipe: Get an attribute from a flow file
Use Case: You have a flow file from which you'd like to inspect an attribute.
Approach: Use the getAttribute(attributeKey) method from the FlowFile object. This method returns the String value for the given attributeKey, or null if the attributeKey is not found. The examples show the retrieval of the value for the "filename" attribute.
Examples:
Groovy
flowFile = session.get()
if(!flowFile) return
myAttr = flowFile.getAttribute('filename')
Jython
flowFile = session.get()
if (flowFile != None):
myAttr = flowFile.getAttribute('filename')
# implicit return at the end
Javascript
var flowFile = session.get()
if (flowFile != null) {
var myAttr = flowFile.getAttribute('filename')
}
JRuby
flowFile = session.get()
if flowFile != nil
myAttr = flowFile.getAttribute('filename')
end
Recipe: Get all attributes from a flow file
Use Case: You have a flow file from which you'd like to retrieve its attributes.
Approach: Use the getAttributes() method from the FlowFile object. This method returns a Map with String keys and String values, representing the key/value pairs of attributes for the flow file. The examples show an iteration over the Map of all attributes for a FlowFile.
Examples:
Groovy
flowFile = session.get()
if(!flowFile) return
flowFile.getAttributes().each { key,value ->
// Do something with the key/value pair
}
Jython
flowFile = session.get()
if (flowFile != None):
for key,value in flowFile.getAttributes().iteritems():
# Do something with key and/or value
# implicit return at the end
Javascript
var flowFile = session.get()
if (flowFile != null) {
var attrs = flowFile.getAttributes();
for each (var attrKey in attrs.keySet()) {
// Do something with attrKey (the key) and/or attrs[attrKey] (the value)
}
}
JRuby
flowFile = session.get()
if flowFile != nil
flowFile.getAttributes().each { |key,value|
# Do something with key and/or value
}
end
Recipe: Transfer a flow file to a relationship
Use Case: After processing a flow file (new or incoming), you want to transfer the flow file to a relationship ("success" or "failure"). In this simple case let us assume there is a variable called "errorOccurred" that indicates which relationship to which the FlowFile should be transferred. Additional error handling techniques will be discussed in part 2 of this series.
Approach: Use the transfer(flowFile, relationship) method from the session object. From the documentation: this method transfers the given FlowFile to the appropriate destination processor work queue(s) based on the given relationship. If the relationship leads to more than one destination the state of the FlowFile is replicated such that each destination receives an exact copy of the FlowFile though each will have its own unique identity.
NOTE: ExecuteScript will perform a session.commit() at the end of each execution to ensure the operations have been committed. You do not need to (and should not) perform a session.commit() within the script.
Examples:
Groovy
flowFile = session.get()
if(!flowFile) return
// Processing occurs here
if(errorOccurred) {
session.transfer(flowFile, REL_FAILURE)
}
else {
session.transfer(flowFile, REL_SUCCESS)
}
Jython
flowFile = session.get()
if (flowFile != None):
# All processing code starts at this indent
if errorOccurred:
session.transfer(flowFile, REL_FAILURE)
else:
session.transfer(flowFile, REL_SUCCESS)
# implicit return at the end
Javascript
var flowFile = session.get();
if (flowFile != null) {
// All processing code goes here
if(errorOccurred) {
session.transfer(flowFile, REL_FAILURE)
}
else {
session.transfer(flowFile, REL_SUCCESS)
}
}
JRuby
flowFile = session.get()
if flowFile != nil
# All processing code goes here
if errorOccurred
session.transfer(flowFile, REL_FAILURE)
else
session.transfer(flowFile, REL_SUCCESS)
end
end
Recipe: Send a message to the log at a specified logging level
Use Case: You want to report some event that has occurred during processing to the logging framework.
Approach: Use the log variable with the warn(), trace(), debug(), info(), or error() methods. These methods can take a single String, or a String followed by an array of Objects, or a String followed by an array of Objects followed by a Throwable. The first one is used for simple messages. The second is used when you have some dynamic objects/values that you want to log. To refer to these in the message string use "{}" in the message. These are evaluated against the Object array in order of appearance, so if the message reads "Found these things: {} {} {}" and the Object array is ['Hello',1,true], then the logged message will be "Found these things: Hello 1 true". The third form of these logging methods also takes a Throwable parameter, and is useful when an exception is caught and you want to log it.
Examples:
Groovy
log.info('Found these things: {} {} {}', ['Hello',1,true] as Object[])
Jython
from java.lang import Object
from jarray import array
objArray = ['Hello',1,True]
javaArray = array(objArray, Object)
log.info('Found these things: {} {} {}', javaArray)
Javascript
var ObjectArrayType = Java.type("java.lang.Object[]");
var objArray = new ObjectArrayType(3);
objArray[0] = 'Hello';
objArray[1] = 1;
objArray[2] = true;
log.info('Found these things: {} {} {}', objArray)
JRuby
log.info('Found these things: {} {} {}', ['Hello',1,true].to_java)
Hopefully these snippets are helpful to illustrate bits of the NiFi API in the context of various scripting languages and flow file operations. I'll put some of these recipes together in a future article, to show some examples of end-to-end scripts. For more examples, use cases, and explanations, please check out my blog. In the next article in this series, I'll talk about reading from and writing to the contents of flow files, as well as discuss error handling techniques. Cheers!
... View more
Labels:
12-15-2016
04:29 PM
13 Kudos
In NiFi/HDF, it is possible to create a kind of lookup table of key/value pairs using a DistributedMapCacheServer. The DistributedMapCacheServer is used by various processors such as GetHBase and DetectDuplicate. It can also be leveraged by users in their flows, with the PutDistributedMapCache and FetchDistributedMapCache processors, by specifying a corresponding DistributedMapCacheClientService. Sometimes, however, it might be the case that the user would like to interact with the DistributedMapCacheServer programmatically (and external to NiFi), say for removing specific entries, inserting/populating entries, etc. To that end I have written a Groovy script (dcachegroovy.txt, rename to dcache.groovy) to allow manipulation of the entries in a DistributedMapCacheServer from the command-line, e.g. The usage is as follows: Usage: groovy dcache.groovy <hostname> <port> <command> <args>
Where <command> is one of the following:
get: Retrieves the values for the keys (provided as arguments)
remove: Removes the keys (specified as arguments)
put: Sets the given keys to the given values, specified as arguments in the form: key1 value1 key2 value2 ... keyN valueN So to insert entries "a = Hello" and "b = World" (assuming a local DistributedMapCacheServer at the default port), you can enter: groovy dcache.groovy localhost 4557 put a Hello b World Which outputs the following: Set a = Hello
Set b = World Then to retrieve the values: groovy dcache.groovy localhost 4557 get a b Which gives: a = Hello
b = World To remove an entry: groovy dcache.groovy localhost 4557 remove b Giving: Removed b Trying the get again for both values (where b no longer exists): groovy dcache.groovy localhost 4557 get a b Gives: a = Hello
b = This script can be used to pre-populate, clear, or inspect a DistributedMapCacheServer. I'd be interested to hear if you try it, whether you find it useful or not, and of course all suggestions for improvements are welcome. Cheers!
... View more
Labels:
08-16-2016
09:56 PM
21 Kudos
NiFi is most effectively used as an "always-on" system, meaning that the data flows are often always operational (running). Doing batch processing is a more difficult task and usually requires some user intervention (such as stopping a source processor).
For relational databases (RDBMS), a common use case is to migrate, replicate, or otherwise move the data from the source RDBMS to some target. If ExecuteSQL were used to get all data from a table (for example), then the processor will execute the query each time it is triggered to run, and will return whatever results correspond to the query.
If the goal is to simply move all rows to some target, then ExecuteSQL could be started then immediately stopped, such that it would only execute once, and all results will be sent down the flow.
However a more common use case is that the source database is being updated from some external process (user, webapp, ERP/CRM/etc. system). In order to get the new rows, the table needs to be queried again. However assuming the "old" rows had already been moved, then many duplicate rows would continue to be processed in the flow.
As an alternative the QueryDatabaseTable processor allows you to specify column(s) in a table that are increasing in value (such as an "ID" or "timestamp" column), and the processor will only retrieve rows from the table whose values in those columns are greater than the maximum value(s) observed so far.
To illustrate, consider the following database table called "users":
id
name
email
age
1
Joe
joe@fakemail.com
42
2
Mary
mary@fakemail.com
24
3
Matt
matt@fakemail.com
38
Here, QueryDatabaseTable would be configured to use a table name of "users" and a "Maximum-Value Column" of "id". When the processor runs the first time, it will not have seen any values from the "id" column and thus all rows will be returned. The query executed is:
SELECT * FROM users
However after that query has completed, QueryDatabaseTable stores the maximum value for the "id" column that it has seen; namely, 3.
Now let's say QueryDatabaseTable has been scheduled to run every 5 minutes, and the next time it runs, the table looks like this:
id
name
email
age
1
Joe
joe@fakemail.com
42
2
Mary
mary@fakemail.com
24
3
Matt
matt@fakemail.com
38
4
Armando
armando@fakemail.com
20
5
Jim
jeff@fakemail.com
30
Because QueryDatabaseTable had stored the maximum value of 3 for the "id" column, this time when the processor runs it executes the following query:
SELECT * FROM users WHERE id > 3
Now you will see just the last two rows (the recently added ones) are returned, and QueryDatabaseTable has stored the new maximum value of 5.
Here is the concept applied to a NiFi flow:
For this example, I have a "users" table containing many attributes about randomly-generated users, and there are 100 records to start with:
If we run QueryDatabaseTable, we see that after the SplitAvro we get 100 separate flow files:
The flow remains this way until new data comes in (with an "id" value larger than 100). Adding one:
Once this row is added, you can see the additional record move through the flow:
This approach works for any column that has increasing values, such as timestamps. If you want to clear out the maximum value(s) from the processor's state, right click on the processor and choose View State. Then you can click "Clear State" on the dialog:
Hopefully this article has shown how to use QueryDatabaseTable to do incremental fetching of database tables in NiFi. Please let me know how/if this works for you. Cheers!
... View more
Labels:
- « Previous
- Next »