Member since
10-17-2016
93
Posts
10
Kudos Received
3
Solutions
06-09-2017
09:00 AM
to run the code in intellij the above code is fine! Only need to add ssc.awaitTermination() after ssc.start().
To run in shell, I need to create a fatJar (uberjar/standalone Jar)The missing import org.apache.nifi.events._ was available in nifi-framework-api-1.2.0.jar .
I used maven to create the fat jar using the maven-assembly-plugin
... View more
06-08-2017
03:35 PM
Hi George I am not using the sandbox but rather have a standalone installation of spark and nifi on my pc I am using apache nifi 1.2.0 and I have followed the entire tutorial. I get the error in import org.apache.nifi.events._ <console>:38: error: object events is not a member of package org.apache.nifi
import org.apache.nifi.events._ I have included all the relevant jars that you have mentioned. nifi-site-to-site-client-1.2.0.jar nifi-spark-receiver-1.2.0.jar nifi-api-1.2.0.jar nifi-utils-1.2.0.jar nifi-client-dto-1.2.0.jar I opened all the jars and sure enough there in no directory org.apache.nifi.events in any of the jars. How can i find this missing import? also i tried to run the code in intellij i dont get any errors but i get the following warning: 17/06/08 18:16:14 INFO ReceiverSupervisorImpl: Stopping receiver with message: Registered unsuccessfully because Driver refused to start receiver 0 i copied the following code in Intellij. i commented the last line // Import all the libraries required
import org.apache.nifi._
import java.nio.charset._
import org.apache.nifi.spark._
import org.apache.nifi.remote.client._
import org.apache.spark._
import org.apache.nifi.events._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.nifi.remote._
import org.apache.nifi.remote.client._
import org.apache.nifi.remote.protocol._
import org.apache.spark.storage._
import org.apache.spark.streaming.receiver._
import java.io._
import org.apache.spark.serializer._
object SparkNiFiAttribute {
def main(args: Array[String]) {
/*
import java.util
val additionalJars = new util.ArrayList[String]
additionalJars.add("/home/arsalan/NiFiSparkJars/nifi-site-to-site-1.2.0.jar")
*/
val config = new SparkConf().setAppName("Nifi_Spark_Data")
// .set("spark.driver.extraClassPath","/home/arsalan/NiFiSparkJars/nifi-site-to-site-client-1.2.0.jar:/home/arsalan/NiFiSparkJars/nifi-spark-receiver-1.2.0.jar:/home/arsalan/nifi-1.2.0/lib/nifi-api-1.2.0.jar:/home/arsalan/nifi-1.2.0/lib/bootstrap/nifi-utils-1.2.0.jar:/home/arsalan/nifi-1.2.0/work/nar/framework/nifi-framework-nar-1.2.0.nar-unpacked/META-INF/bundled-dependencies/nifi-client-dto-1.2.0.jar")
.set("spark.driver.allowMultipleContexts", "true")
.setMaster("local[*]")
// Build a Site-to-site client config with NiFi web url and output port name[spark created in step 6c]
val conf = new SiteToSiteClient.Builder().url("http://localhost:8080/nifi").portName("Data_to_Spark").buildConfig()
// Set an App Name
// Create a StreamingContext
val ssc = new StreamingContext(config, Seconds(1))
ssc.sparkContext.getConf.getAll.foreach(println)
// Create a DStream using a NiFi receiver so that we can pull data from specified Port
val lines = ssc.receiverStream(new NiFiReceiver(conf, StorageLevel.MEMORY_ONLY))
// Map the data from NiFi to text, ignoring the attributes
val text = lines.map(dataPacket => new String(dataPacket.getContent, StandardCharsets.UTF_8))
// Print the first ten elements of each RDD generated
text.print()
// Start the computation
ssc.start()
}
}
//SparkNiFiAttribute.main(Array())
... View more