Created on 02-01-2016 10:00 PM - edited 08-17-2019 01:22 PM
Incorporating the Apache NiFi Receiver into your Spark application is pretty easy. First, you'll need to add the Receiver to your application's POM:
<dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-spark-receiver</artifactId> <version>0.4.1</version> </dependency>
That's all that is needed in order to be able to use the NiFi Receiver. So now we'll look at how to use the Receiver in your code.
The NiFi Receiver is a Reliable Java Receiver. This means that if we lose a node after it pulls the data from NiFi, the data will not be lost. Instead, another node will simply pull and process the data. In order to create a NiFi Receiver, we need to first create a configuration that tells the Receiver where to pull the data from. The simplest form is to just tell the config where NiFi is running and which Port to pull the data from:
SiteToSiteClientConfig config = new SiteToSiteClient.Builder() .url("http://localhost:8080/nifi") .portName("Data For Spark") .buildConfig();
To briefly explain the terminology here, NiFi refers to its mechanism for transferring data between clusters or instances as Site-to-Site. It exposes options for pushing data or pulling, so that the most appropriate approach can be used for each situation. Spark doesn't supply a mechanism to have data pushed to it - instead, it wants to pull data from other sources. In NiFi, this data can be exposed in such a way that a receiver can pull from it by adding an Output Port to the root process group. For Spark, we will use this same data transport mechanism - we will use the Site-to-Site protocol to pull data from NiFi's Output Ports. In order for this to work, we need two pieces of information: the URL to connect to NiFi and the name of the Output Port to pull data from.
If the NiFi instance to connect to is clustered, the URL should be that of the NiFi Cluster Manager. In this case, the Receiver will automatically contact the Cluster Manager to determine which nodes are in the cluster and will automatically start pulling data from all nodes. The Receiver automatically determines which nodes have the most data backed up and will pull from those nodes more heavily than the others. This information is automatically updated periodically so that as new nodes are added to the cluster or nodes leave the cluster, or if the nodes become more or less bogged down, the Receiver will automatically adjust to handle this.
Next, we need to instruct the Receiver which Port to pull data from. Since NiFi can have many different Output Ports, we need to provide either a Port Identifier or a Port Name. If desired, we can also configure communications timeouts; SSL information for secure data transfer, authentication, and authorization; compression; and preferred batch sizes. See the JavaDocs for the SiteToSiteClient.Builder for more information.
Once we have constructed this configuration object, we can now create the Receiver:
SparkConf sparkConf = new SparkConf().setAppName("NiFi-Spark Streaming example");JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(1000L)); // Create a JavaReceiverInputDStream using a NiFi receiver so that we can pull data from // specified Port JavaReceiverInputDStream packetStream = ssc.receiverStream(new NiFiReceiver(config, StorageLevel.MEMORY_ONLY()));
This gives us a JavaReceiverInputDStream of type NiFiDataPacket. The NiFiDataPacket is a simple construct that packages an arbitrary byte array (raw data) with a map of Key/Value pairs (referred to as attributes) that correspond to the data. As an example, we can process the data without paying attention to the attributes:
// Map the data from NiFi to text, ignoring the attributes JavaDStream text = packetStream.map(new Function() { public String call(final NiFiDataPacket dataPacket) throws Exception { return new String(dataPacket.getContent(), StandardCharsets.UTF_8); } });
Or we can make use of the attributes:
// Extract the 'uuid' attribute JavaDStream text = packetStream.map(new Function() { public String call(final NiFiDataPacket dataPacket) throws Exception { return dataPacket.getAttributes().get("uuid"); } });
So now we have our Receiver ready to pull data from NiFi. Let's look at how we can configure NiFi to expose this data.
First, NiFi has to be configured to allow site-to-site communication. This is accomplished by setting the nifi.remote.input.socket.port property in the nifi.properties file to the desired port to use for site-to-site (if this value is changed, it will require a restart of NiFi for the changes to take effect).
Now that NiFi is setup to allow site-to-site, we will build a simple flow to feed data to Spark. We will start by adding two GetFile processors to the flow. One will pick up from /data/in/notifications and the other will pick up from /data/in/analysis:
Next, let's assume that we want to assign different priorities to the data that is picked up from each directory. Let's assign a priority of "1" (the highest priority) to data coming from the analysis directory and assign a lower priority to the data from the notifications directory. We can use the UpdateAttribute processor to do this. Drag the Processor onto the graph and configure it. In the Settings tab, we set the name to "Set High Priority". In the Properties tab, we have no properties available. However, there's a button in the top-right of the dialog that says "New Property."
Clicking that button lets us add a property with the name priority and a value of 1. When we click OK, we see it added to the table:
Let's click “Apply” and add another UpdateAttribute processor for the data coming from the notifications directory. Here, we will add a property with the name priority but give it a value of 2. After configuring these processor and connecting the GetFile processors to them, we end up with a graph that looks like this:
Now, we want to combine all of the data into a single queue so that we can prioritize it before sending the data to Spark. We do this by adding a Funnel to the graph and connecting both of the UpdateAttribute processors to the Funnel
Now we can add an Output Port that our Spark Streaming application can pull from. We drag an Output Port onto our graph. When prompted for a name, we will name it Data For Spark, as this is the name that we gave to our Spark Streaming application. Once we connect the Funnel to the Output Port, we have a graph like this:
We haven't actually told NiFi to prioritize the data yet, though. We've simply added an attribute named priority. To prioritize the data based on that, we can right-click on the connection that feeds the Output Port and choose Configure. From the Settings tab, we can drag the PriorityAttributePrioritizer from the list of Available Prioritizers to the list of Selected Prioritizers:
Once we click Apply, we're done. We can start all of the components and we should see the data start flowing to our Spark Streaming application:
Now any data that appears in our /data/in/notifications or /data/in/analysis directory will make its way to our streaming application!
By using NiFi's Output Port mechanism, we are able to create any number of different named Output Ports, as well. This allows you, as a NiFi user, to choose exactly which data gets exposed to Spark. Additionally, if NiFi is configured to be secure, each Output Port can be configured to provide the data to only the hosts and users that are authorized.
Let's consider, though, that this data has significant value for processing in a streaming fashion as soon as we have the data, but it may also be of value to a batch processing analytic. This is easy to handle, as well. We can add a MergeContent processor to our graph and configure it to merge data into 64-128 MB TAR files. Then, when we have a full TAR file, we can push the data to HDFS. We can configure MergeContent to make these bundles like so:
We can then send the merged files to PutHDFS and auto-terminate the originals. We can feed all the data from our Funnel to this MergeContent processor, and this will allow us to dual-route all data to both HDFS (bundled into 64-128 MB TAR files) and to Spark Streaming (making the data available as soon as possible with very low latency):
We would love to hear any questions, comments, or feedback that you may have!
Mark Payne Senior Member of Technical Staff at Hortonworks
LinkedIn: https://www.linkedin.com/in/dataflowmark