Member since
02-25-2016
72
Posts
34
Kudos Received
5
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
3653 | 07-28-2017 10:51 AM | |
2831 | 05-08-2017 03:11 PM | |
1191 | 04-03-2017 07:38 PM | |
2896 | 03-21-2017 06:56 PM | |
1185 | 02-09-2017 08:28 PM |
01-30-2018
08:32 PM
Thank you @Ramya Jayathirtha I downloaded jar file from maven and executed,it went through this. Now its blocked different error File "/usr/hdp/current/spark-client/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main process() File "/usr/hdp/current/spark-client/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process serializer.dump_stream(func(split_index, iterator), outfile) File "/usr/hdp/current/spark-client/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream vs = list(itertools.islice(iterator, batch)) File "/sourcefiles/spkf.py", line 9, in <lambda> parsed = kafkaStream.map(lambda v: json.loads(v[1])) File "/usr/lib64/python2.7/json/__init__.py", line 338, in loads return _default_decoder.decode(s) File "/usr/lib64/python2.7/json/decoder.py", line 366, in decode obj, end = self.raw_decode(s, idx=_w(s, 0).end()) File "/usr/lib64/python2.7/json/decoder.py", line 384, in raw_decode raise ValueError("No JSON object could be decoded") ValueError: No JSON object could be decoded
... View more
01-30-2018
07:16 PM
Hi Team, I am trying to copy streaming data from Kafka topic to HDFS directory. It is throwing an error 'java.lang.ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper' Any kind of assistance to help me resolve is much appreciated. Here are the steps I followed Step 1 - Created topic -> topicXYZ STep 2 - created producer and linked to topicXYZ Step 3 - created consumer and linked to topicXYZ => pyspark program to stream and copy data to HDFS directory from pyspark import SparkContext from pyspark.streaming import StreamingContext from pyspark.streaming.kafka import KafkaUtils import json sc = SparkContext(appName="kafksteststreaming") sc.setLogLevel("WARN") ssc = StreamingContext(sc, 60) kafkaStream = KafkaUtils.createStream(ssc, 'xxxx:2181', 'raw-event-streaming-consumer', {'topicXYZ':1})parsed = kafkaStream.map(lambda (k, v): json.loads(v)) parsed.saveAsTextFiles('/tmp/folderxyz') ssc.start() ssc.awaitTermination() spark-submit --jars /usr/hdp/current/spark-client/lib/spark-assembly-*.jar spkf.py The above code is throwing error Spark Streaming's Kafka libraries not found in class path. Try one
of the following. Include the Kafka library and its dependencies with in the spark-submit command $ bin/spark-submit --packages org.apache.spark:spark-streaming-kafka:1.6 Download the JAR of the artifact from Maven Central http://search.maven.org/,
Group Id = org.apache.spark, Artifact Id =
spark-streaming-kafka-assembly, Version = 1.4.0. Then, include the jar
in the spark-submit command as
... View more
Labels:
- Labels:
-
Apache Kafka
-
Apache Spark
11-09-2017
06:53 PM
1 Kudo
Hi Team, I have a requirement to read an existing hive table, massage few columns and overwrite back the same hive table. Below is the code lp=hc.sql('select * from logistics_prd') adt=hc.sql('select * from senty_audit.maintable') cmb_data=adt.unionAll(lp) cdc_data=cmb_data.distinct() cdc_data.write.mode('overwrite').saveAsTable('senty_audit.temptable') In step 2 I am reading senty_audit.maintable from hive. Then I am joining with other dataframe, in last step I am trying to load back(OVERWRITE) to same hive table. In this case spark is throwing an error 'org.apache.spark.sql.AnalysisException: Cannot insert overwrite into table that is also being read from'. Can you please help me understand how should I proceed in this scenario.
... View more
Labels:
- Labels:
-
Apache Hive
-
Apache Spark
11-01-2017
06:28 PM
Hi team, I am looking to convert a unix timestamp field to human readable format. Can some one help me in this. I am using from unix_timestamp('Timestamp', "yyyy-MM-ddThh:mm:ss"), but this is not working. Any suggestions would be of great help
... View more
Labels:
- Labels:
-
Apache Ambari
10-25-2017
03:28 PM
@Aditya Sirna thank you for your response. I have tried that too, this way it is still creating a folder with name 'finename.txt' and one single file 'part-00000' file in the folder. Kindly let me know if there is a way to create a file directly on specified path
... View more
10-25-2017
03:01 PM
Hi Team, I am using python spark 1.6 trying to save RDD to hdfs path, when I am giving command rdd.saveAsTextFile('/path/finename.txt') it is saving directory and files in it. It is creating directory in specified path instead of creating file Can you suggest a way to create a file in hdfs specified path instead of directory and files in it.
... View more
Labels:
- Labels:
-
Apache Spark
10-19-2017
03:15 AM
1 Kudo
rdd.sortByKey() sorts in ascending order. I want to sort in descending order. I tried rdd.sortByKey("desc") but it did not work
... View more
Labels:
- Labels:
-
Apache Spark
10-13-2017
07:22 PM
@Dinesh Chitlangia Thank you for explanation. In that case i would rather use reducByKey() to get the number of occurence. thanks for the info on CountByValue()
... View more
10-13-2017
06:02 PM
1 Kudo
Hi Team, I am working on to get count out number of occurence in a file. RDD1 = RDD.flatMap(lambda x:x.split('|')) RDD2 = RDD1.countByValue() I want to save the output of RDD2 to textfile. i am able to see output by for x,y in RDD2.items():print(x,y) but when tried to save to textfile using RDD2.saveAsTextFile(\path) it is not working. it was throwing as 'AttributeError: 'collections.defaultdict' object has no attribute 'saveAsTextFile'' Can you please help me understanding if i am missing something here. Or how to save countByValue to text file
... View more
Labels:
- Labels:
-
Apache Spark