Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Kakfa Spark Streaming Error

avatar
Super Collaborator

Hi Team,
I am trying to copy streaming data from Kafka topic to HDFS directory.
It is throwing an error 'java.lang.ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper'

Any kind of assistance to help me resolve is much appreciated.

Here are the steps I followed
Step 1 - Created topic -> topicXYZ
STep 2 - created producer and linked to topicXYZ
Step 3 - created consumer and linked to topicXYZ

=> pyspark program to stream and copy data to HDFS directory
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json
sc = SparkContext(appName="kafksteststreaming")
sc.setLogLevel("WARN")
ssc = StreamingContext(sc, 60)
kafkaStream = KafkaUtils.createStream(ssc, 'xxxx:2181', 'raw-event-streaming-consumer', {'topicXYZ':1})parsed = kafkaStream.map(lambda (k, v): json.loads(v))
parsed.saveAsTextFiles('/tmp/folderxyz')
ssc.start()
ssc.awaitTermination()

spark-submit --jars /usr/hdp/current/spark-client/lib/spark-assembly-*.jar spkf.py

The above code is throwing error

Spark Streaming's Kafka libraries not found in class path. Try one of the following.

  1. Include the Kafka library and its dependencies with in the spark-submit command
    1. $ bin/spark-submit --packages org.apache.spark:spark-streaming-kafka:1.6
  2. Download the JAR of the artifact from Maven Central http://search.maven.org/, Group Id = org.apache.spark, Artifact Id = spark-streaming-kafka-assembly, Version = 1.4.0. Then, include the jar in the spark-submit command as
1 ACCEPTED SOLUTION

avatar
Rising Star

@Viswa

Add this configuration in your pom.xml under build tag, rather than adding jar in spark-submit.

<descriptorRefs>

<descriptorRef>jar-with-dependencies</descriptorRef>

</descriptorRefs>

View solution in original post

3 REPLIES 3

avatar
Rising Star

@Viswa

Add this configuration in your pom.xml under build tag, rather than adding jar in spark-submit.

<descriptorRefs>

<descriptorRef>jar-with-dependencies</descriptorRef>

</descriptorRefs>

avatar
Super Collaborator

Thank you @Ramya Jayathirtha

I downloaded jar file from maven and executed,it went through this.

Now its blocked different error

File "/usr/hdp/current/spark-client/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
process()
File "/usr/hdp/current/spark-client/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/usr/hdp/current/spark-client/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "/sourcefiles/spkf.py", line 9, in <lambda>
parsed = kafkaStream.map(lambda v: json.loads(v[1]))
File "/usr/lib64/python2.7/json/__init__.py", line 338, in loads
return _default_decoder.decode(s)
File "/usr/lib64/python2.7/json/decoder.py", line 366, in decode
obj, end = self.raw_decode(s, idx=_w(s, 0).end())
File "/usr/lib64/python2.7/json/decoder.py", line 384, in raw_decode
raise ValueError("No JSON object could be decoded")
ValueError: No JSON object could be decoded

avatar
Super Collaborator

@Ramya Jayathirtha

it worked

thank you for your timely response