Created 01-30-2018 07:16 PM
Hi Team,
I am trying to copy streaming data from Kafka topic to HDFS directory.
It is throwing an error 'java.lang.ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper'
Any kind of assistance to help me resolve is much appreciated.
Here are the steps I followed
Step 1 - Created topic -> topicXYZ
STep 2 - created producer and linked to topicXYZ
Step 3 - created consumer and linked to topicXYZ
=> pyspark program to stream and copy data to HDFS directory
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json
sc = SparkContext(appName="kafksteststreaming")
sc.setLogLevel("WARN")
ssc = StreamingContext(sc, 60)  
kafkaStream = KafkaUtils.createStream(ssc, 'xxxx:2181', 'raw-event-streaming-consumer', {'topicXYZ':1})parsed = kafkaStream.map(lambda (k, v): json.loads(v)) 
parsed.saveAsTextFiles('/tmp/folderxyz')
ssc.start() 
ssc.awaitTermination()
spark-submit  --jars /usr/hdp/current/spark-client/lib/spark-assembly-*.jar spkf.py
The above code is throwing error
Spark Streaming's Kafka libraries not found in class path. Try one of the following.
Created 01-30-2018 07:47 PM
Add this configuration in your pom.xml under build tag, rather than adding jar in spark-submit.
<descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs>
Created 01-30-2018 07:47 PM
Add this configuration in your pom.xml under build tag, rather than adding jar in spark-submit.
<descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs>
Created 01-30-2018 08:32 PM
Thank you @Ramya Jayathirtha
I downloaded jar file from maven and executed,it went through this.
Now its blocked different error
File "/usr/hdp/current/spark-client/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
  process()
  File "/usr/hdp/current/spark-client/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
  serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/hdp/current/spark-client/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream
  vs = list(itertools.islice(iterator, batch))
  File "/sourcefiles/spkf.py", line 9, in <lambda>
  parsed = kafkaStream.map(lambda v: json.loads(v[1]))
  File "/usr/lib64/python2.7/json/__init__.py", line 338, in loads
  return _default_decoder.decode(s)
  File "/usr/lib64/python2.7/json/decoder.py", line 366, in decode
  obj, end = self.raw_decode(s, idx=_w(s, 0).end())
  File "/usr/lib64/python2.7/json/decoder.py", line 384, in raw_decode
  raise ValueError("No JSON object could be decoded")
ValueError: No JSON object could be decoded
Created 01-30-2018 09:34 PM
 
					
				
				
			
		
