Support Questions
Find answers, ask questions, and share your expertise

SparkStreaming issue when using Stateful transformations

SparkStreaming issue when using Stateful transformations

Explorer

I'm using HDP-2.4 sandbox platform for my python application. The Kafka consumer streams messages to Spark Streaming application. With the Stateless transformations everything works fine. The problem arise when I switch to Stateful transformation with updateStateByKey() and checkpoint().

I get the following error:

ERROR StreamingContext: Error starting the context, marking it as stopped java.io.IOException: org.apache.spark.SparkException: An exception was raised by Python: Traceback (most recent call last): File "/usr/hdp/2.4.0.0-169/spark/python/pyspark/streaming/util.py", line 105, in dumps

...

File "/usr/lib64/python2.6/pickle.py", line 306, in save rv = reduce(self.proto) TypeError: 'JavaPackage' object is not callable at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1207) at org.apache.spark.streaming.api.python.TransformFunction.writeObject(PythonDStream.scala:100) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

... 61 more

The jar libraries and packages that I load are as follows:

pyspark --jars $SPARK_HOME/lib/spark-streaming-kafka-assembly_2.10-1.6.0.jar,$SPARK_HOME/lib/spark-hdp-assembly.jar,$SPARK_HOME/lib/spark-examples-1.6.0.2.4.0.0-169-hadoop2.7.1.2.4.0.0-169.jar,$KAFKA_HOME/libs/kafka_2.10-0.9.0.2.4.0.0-169.jar,$KAFKA_HOME/libs/kafka-clients-0.9.0.2.4.0.0-169.jar --packages com.databricks:spark-csv_2.10:1.5.0,org.apache.spark:spark-streaming-kafka_2.10:1.6.0,com.datastax.spark:spark-cassandra-connector_2.10:1.6.0

The code snippet for creating SparkStream and calling updateStateByKey is following:

...

...
conf = SparkConf() \
    .setAppName("Streaming Airlines") \
    .setMaster("local[2]") \
    .set("spark.cassandra.connection.host", "10.0.0.52")
sc = SparkContext(conf=conf)
sqlContext=SQLContext(sc)

ssc = StreamingContext(sc, 30)
kvs = KafkaUtils.createStream(ssc, "10.0.0.52:2181", "spark-streaming-consumer", {'file-topic': 1})

ssc.checkpoint("/tmp")

def updateFunction(newValues, runningCount):
    return sum(newValues) + (runningCount or 0)
...
rows = originRows.union(destRows)\
    .reduceByKey(lambda x, y: x+y)\
    .repartition(1) \
    .transform(lambda rdd: rdd.sortByKey(ascending=False))\
    .updateStateByKey(updateFunction)
...

The combination of libraries used in HDP-2.4 like Python 2.6, Spark 1.6.0 Kafka 0.9 and Scala 2.10, py4j-0.9-src.zip show that they are compatible (I'm not sure regarding version of Python that is really old).

What am I missing?