Created on 01-31-2016 08:46 PM - edited 08-17-2019 01:20 PM
Spark doesn't supply a mechanism to have data pushed to it - instead, it wants to pull data from other sources. In NiFi, this data can be exposed in such a way that a receiver can pull from it by adding an Output Port to the root process group. For Spark, we will use this same mechanism - we will use the Site-to-Site protocol to pull data from NiFi's Output Ports.
1) Assuming you already have latest version of NiFi-0.4.1/HDF-1.1.1 downloaded on your HW Sandbox, else execute below after ssh connectivity to sandbox is established:
# cd /opt/ # wget http://public-repo-1.hortonworks.com/HDF/1.1.1.0/nifi-1.1.1.0-12-bin.tar.gz # tar -xvf nifi-1.1.1.0-12-bin.tar.gz
2) Download Compatible version [in our case 0.4.1] of "nifi-spark-receiver" and "nifi-site-to-site-client" to Sandbox in a specific location:
# mkdir /opt/spark-receiver # cd /opt/spark-receiver # wget http://central.maven.org/maven2/org/apache/nifi/nifi-site-to-site-client/0.4.1/nifi-site-to-site-cli... # wget http://central.maven.org/maven2/org/apache/nifi/nifi-spark-receiver/0.4.1/nifi-spark-receiver-0.4.1....
1) Configure Spark to load some specific NiFi Libraries as below, edit spark-defaults.conf to add jars to ClassPath. Append Below lines to bottom:
# vi /usr/hdp/current/spark-client/conf/spark-defaults.conf spark.driver.extraClassPath /opt/spark-receiver/nifi-spark-receiver-0.4.1.jar:/opt/spark-receiver/nifi-site-to-site-client-0.4.1.jar:/opt/nifi-1.1.1.0-12/lib/nifi-api-1.1.1.0-12.jar:/opt/nifi-1.1.1.0-12/lib/bootstrap/nifi-utils-1.1.1.0-12.jar:/opt/nifi-1.1.1.0-12/work/nar/framework/nifi-framework-nar-1.1.1.0-12.nar-unpacked/META-INF/bundled-dependencies/nifi-client-dto-1.1.1.0-12.jar spark.driver.allowMultipleContexts = true
2) Open nifi.properties for updating configurations:
# vi /opt/nifi-1.1.1.0-12/conf/nifi.properties
3) Change NIFI http port to run on 8090 as default 8080 will conflict with Ambari web UI
# web properties # nifi.web.http.port=8090
4) Configure NiFi instance to run site-to site by changing below configuration : add a port say 8055 and set "nifi.remote.input.secure" as "false"
# Site to Site properties nifi.remote.input.socket.host= nifi.remote.input.socket.port=8055 nifi.remote.input.secure=false
5) Now Start [Restart if already running as configuration change to take effect] NiFi on your Sandbox.
# /opt/nifi-1.1.1.0-12/bin/nifi.sh start
6) Let us build a small flow on NiFi canvas to read app log generated by NiFi itself to feed to spark:
a) Connect to below url in your browser: http://<your_vm_ip>:8090/nifi/
b) Drop an "ExecuteProcess" Processor to canvas [or you can use TailFile Processor] to read lines added to "nifi-app.log". Auto Terminate relationship Failure. The configuration on the processor would look like below:
c) Drop an OutputPort to the canvas and Name it 'spark', Once added, connect "ExecuteProcess" to the port for Success relationship. This simple flow will look like below:
7) Now lets go back to VM command line and create the Scala application to pull data from NiFi output port we just created: change directory to "/opt/spark-receiver" and create a shell script file "spark-data.sh"
# cd /opt/spark-receiver # vi spark-data.sh
😎 Add the below lines to the script file required for application to pull the data from NiFi output port and save it:
// Import all the libraries required import org.apache.nifi._ import java.nio.charset._ import org.apache.nifi.spark._ import org.apache.nifi.remote.client._ import org.apache.spark._ import org.apache.nifi.events._ import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ import org.apache.nifi.remote._ import org.apache.nifi.remote.client._ import org.apache.nifi.remote.protocol._ import org.apache.spark.storage._ import org.apache.spark.streaming.receiver._ import java.io._ import org.apache.spark.serializer._ object SparkNiFiAttribute { def main(args: Array[String]) { // Build a Site-to-site client config with NiFi web url and output port name[spark created in step 6c] val conf = new SiteToSiteClient.Builder().url("http://localhost:8090/nifi").portName("spark").buildConfig() // Set an App Name val config = new SparkConf().setAppName("Nifi_Spark_Data") // Create a StreamingContext val ssc = new StreamingContext(config, Seconds(10)) // Create a DStream using a NiFi receiver so that we can pull data from specified Port val lines = ssc.receiverStream(new NiFiReceiver(conf, StorageLevel.MEMORY_ONLY)) // Map the data from NiFi to text, ignoring the attributes val text = lines.map(dataPacket => new String(dataPacket.getContent, StandardCharsets.UTF_8)) // Print the first ten elements of each RDD generated text.print() // Start the computation ssc.start() } } SparkNiFiAttribute.main(Array())
9) Lets Go back to the NiFi Web UI and start the flow we created, make sure nothing is wrong and you shall see data flowing
10) Now load the script to spark-shell with below command and start streaming:
# spark-shell -i spark-data.sh
11) In the screenshot below, you can see the NiFi logs being pulled and printed on the console:
12) Same way we can pull data from NiFi and extract the associated Attributes:
// Import all the libraries required import org.apache.nifi._ import java.nio.charset._ import org.apache.nifi.spark._ import org.apache.nifi.remote.client._ import org.apache.spark._ import org.apache.nifi.events._ import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ import org.apache.nifi.remote._ import org.apache.nifi.remote.client._ import org.apache.nifi.remote.protocol._ import org.apache.spark.storage._ import org.apache.spark.streaming.receiver._ import java.io._ import org.apache.spark.serializer._ object SparkNiFiData { def main(args: Array[String]) { // Build a Site-to-site client config with NiFi web url and output port name val conf = new SiteToSiteClient.Builder().url("http://localhost:8090/nifi").portName("spark").buildConfig() // Set an App Name val config = new SparkConf().setAppName("Nifi_Spark_Attributes") // Create a StreamingContext val ssc = new StreamingContext(config, Seconds(5)) // Create a DStream using a NiFi receiver so that we can pull data from specified Port val lines = ssc.receiverStream(new NiFiReceiver(conf, StorageLevel.MEMORY_ONLY)) // Extract the 'uuid' attribute val text = lines.map(dataPacket => dataPacket.getAttributes.get("uuid")) text.print() ssc.start() ssc.awaitTermination() } } SparkNiFiData.main(Array())
13) In the screenshot below, you can see the FlowFile attribute "uuid" being extracted and printed on the console:
14) You can create multiple output ports to transmit data to different Spark application from same NiFi Instance at the same time.
Thanks,
Jobin George
Created on 11-05-2017 02:54 PM
Hi,
i got exception
unable to create serializer "com.esotericsoftware.kryo.serializer" for class: org.apache.nifi.remote.client.SiteToSiteClient$StandardSiteToSiteClientConfig
what could be the problem?
Created on 10-07-2022 08:42 PM
Hi, I have been following your instruction.
If I want to do the same thing, but with pyspark, will the code similar to this?