Created 01-30-2018 07:16 PM
Hi Team,
I am trying to copy streaming data from Kafka topic to HDFS directory.
It is throwing an error 'java.lang.ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper'
Any kind of assistance to help me resolve is much appreciated.
Here are the steps I followed
Step 1 - Created topic -> topicXYZ
STep 2 - created producer and linked to topicXYZ
Step 3 - created consumer and linked to topicXYZ
=> pyspark program to stream and copy data to HDFS directory
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json
sc = SparkContext(appName="kafksteststreaming")
sc.setLogLevel("WARN")
ssc = StreamingContext(sc, 60)
kafkaStream = KafkaUtils.createStream(ssc, 'xxxx:2181', 'raw-event-streaming-consumer', {'topicXYZ':1})parsed = kafkaStream.map(lambda (k, v): json.loads(v))
parsed.saveAsTextFiles('/tmp/folderxyz')
ssc.start()
ssc.awaitTermination()
spark-submit --jars /usr/hdp/current/spark-client/lib/spark-assembly-*.jar spkf.py
The above code is throwing error
Spark Streaming's Kafka libraries not found in class path. Try one of the following.
Created 01-30-2018 07:47 PM
Add this configuration in your pom.xml under build tag, rather than adding jar in spark-submit.
<descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs>
Created 01-30-2018 07:47 PM
Add this configuration in your pom.xml under build tag, rather than adding jar in spark-submit.
<descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs>
Created 01-30-2018 08:32 PM
Thank you @Ramya Jayathirtha
I downloaded jar file from maven and executed,it went through this.
Now its blocked different error
File "/usr/hdp/current/spark-client/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
process()
File "/usr/hdp/current/spark-client/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/usr/hdp/current/spark-client/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "/sourcefiles/spkf.py", line 9, in <lambda>
parsed = kafkaStream.map(lambda v: json.loads(v[1]))
File "/usr/lib64/python2.7/json/__init__.py", line 338, in loads
return _default_decoder.decode(s)
File "/usr/lib64/python2.7/json/decoder.py", line 366, in decode
obj, end = self.raw_decode(s, idx=_w(s, 0).end())
File "/usr/lib64/python2.7/json/decoder.py", line 384, in raw_decode
raise ValueError("No JSON object could be decoded")
ValueError: No JSON object could be decoded
Created 01-30-2018 09:34 PM