Member since
11-22-2017
3
Posts
3
Kudos Received
0
Solutions
04-20-2018
03:32 AM
3 Kudos
Hi All,
Earlier I upgraded my development cluster from 5.13.0 to 5.14.2
All is working now apart from Spark2
When we try to run spark2-shell the following error occurs:
ERROR spark.SparkContext: Error initializing SparkContext.
org.apache.spark.SparkException: Exception when registering SparkListener
at org.apache.spark.SparkContext.setupAndStartListenerBus(SparkContext.scala:2364)
at org.apache.spark.SparkContext.<init>(SparkContext.scala:553)
at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2486)
at org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:930)
at org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:921)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:921)
at org.apache.spark.repl.Main$.createSparkSession(Main.scala:103) [...] Caused by: java.io.FileNotFoundException: Lineage directory /var/log/spark2/lineage doesn't exist or is not writable. at com.cloudera.spark.lineage.LineageWriter$.checkLineageConfig(LineageWriter.scala:158) at com.cloudera.spark.lineage.NavigatorAppListener.<init>(ClouderaNavigatorListener.scala:30) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) [...]
Anyone able to help? Thanks
... View more
Labels:
- Labels:
-
Apache Spark
-
Cloudera Manager
04-10-2018
12:23 AM
Thanks @Dan Burkert - I've done some more work on this and it seems that its possible, but I ran into a few issues with it. There is no sink for kudu in spark streaming, so you have to do a foreachwriter sink instead. Then convert the state class to a dataframe and upsert each row at a time. I backed out of this approach as when doing a Seq() on the value output by the foreachwriter to convert to a dataframe, the spark session was lost within the process stage, and I was unable to initiate spark session at this area. My code incase anyone has tried this before and knows a way around it: df.select([...]) .as[IoTState] .groupbyKey(_.assetId) .mapGroupsWithState(GroupStateTimeout.NoTimeout)(updateIoTStateFromEvent) .writeStream .option("checkpointLocation",conf.stateCheckPoint) .format("console") .foreach(new ForeachWriter[IoTState] { override def open(partitionId: Long, version: Long): Boolean = { true } override def process(value: IoTState): Unit = { val valueDF: DataFrame = Seq(value).toDF() kuduContext.upsertRows(valueDF, conf.kuduTable) } override def close(errorOrNull: Throwable): Unit = { } }) .outputMode("update") .trigger(Trigger.ProcessingTime("2 seconds")) .start() .awaitTermination()
... View more
04-03-2018
02:53 AM
Hello all, I'm trying to work on a Streaming Appllication that keeps the current state of an IoT Device. I want to be able to store the IoT devices current location, the last time it reported in and some aggregations in an arbitrary state. I've got the code for the above working with an ouput to the console, but my current issue is that I now need to make this code productionised by being able to query the state to put this out to a user, such as where is my IoT device now. Im using structures streaming with Spark but I'd like to either A) Use kudu as a sink and query kudu for the latest data. and have an upsert to the data when it changes or B) access the state some other way The only way I've been able to see to do option A so far is the ForeachWritter Has anyone done this before? Thanks
... View more
Labels:
- Labels:
-
Apache Kudu
-
Apache Spark