Support Questions
Find answers, ask questions, and share your expertise

SparkStreaming issue when using Stateful transformations

SparkStreaming issue when using Stateful transformations


I'm using HDP-2.4 sandbox platform for my python application. The Kafka consumer streams messages to Spark Streaming application. With the Stateless transformations everything works fine. The problem arise when I switch to Stateful transformation with updateStateByKey() and checkpoint().

I get the following error:

ERROR StreamingContext: Error starting the context, marking it as stopped org.apache.spark.SparkException: An exception was raised by Python: Traceback (most recent call last): File "/usr/hdp/", line 105, in dumps


File "/usr/lib64/python2.6/", line 306, in save rv = reduce(self.proto) TypeError: 'JavaPackage' object is not callable at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1207) at org.apache.spark.streaming.api.python.TransformFunction.writeObject(PythonDStream.scala:100) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

... 61 more

The jar libraries and packages that I load are as follows:

pyspark --jars $SPARK_HOME/lib/spark-streaming-kafka-assembly_2.10-1.6.0.jar,$SPARK_HOME/lib/spark-hdp-assembly.jar,$SPARK_HOME/lib/spark-examples-,$KAFKA_HOME/libs/kafka_2.10-,$KAFKA_HOME/libs/kafka-clients- --packages com.databricks:spark-csv_2.10:1.5.0,org.apache.spark:spark-streaming-kafka_2.10:1.6.0,com.datastax.spark:spark-cassandra-connector_2.10:1.6.0

The code snippet for creating SparkStream and calling updateStateByKey is following:


conf = SparkConf() \
    .setAppName("Streaming Airlines") \
    .setMaster("local[2]") \
    .set("", "")
sc = SparkContext(conf=conf)

ssc = StreamingContext(sc, 30)
kvs = KafkaUtils.createStream(ssc, "", "spark-streaming-consumer", {'file-topic': 1})


def updateFunction(newValues, runningCount):
    return sum(newValues) + (runningCount or 0)
rows = originRows.union(destRows)\
    .reduceByKey(lambda x, y: x+y)\
    .repartition(1) \
    .transform(lambda rdd: rdd.sortByKey(ascending=False))\

The combination of libraries used in HDP-2.4 like Python 2.6, Spark 1.6.0 Kafka 0.9 and Scala 2.10, show that they are compatible (I'm not sure regarding version of Python that is really old).

What am I missing?