Created 06-10-2016 10:18 AM
I am trying to fetch json format data from kafka through spark streaming and want to create a temp table in spark to query json data like normal table.
i tried several tutorials available on internet but did'nt get success. I am able to read a text file from hdfs and process it through spark, but stuck using json data from kafka.
can somebody guide me on this.
Created 06-10-2016 10:53 AM
And shameless plug:
You can have a look at the parser class I wrote. You would need to write something similar that parses your JSON object and returns a Java/Scala object that you can then use in your analytics
Created 06-10-2016 10:22 AM
Could you please try this
Created 06-10-2016 10:39 AM
You can refer to below Spark documentation:
http://spark.apache.org/docs/latest/streaming-kafka-integration.html
Thanks and Regards,
Sindhu
Created 06-10-2016 10:53 AM
And shameless plug:
You can have a look at the parser class I wrote. You would need to write something similar that parses your JSON object and returns a Java/Scala object that you can then use in your analytics
Created 06-10-2016 11:22 AM
nice article..
Created 06-10-2016 02:59 PM
Thanks a lot 🙂
Created 06-10-2016 03:14 PM
I have an annotated Scala example here: https://community.hortonworks.com/articles/33275/receiving-avro-messages-through-kafka-in-a-spark-s....
Created 06-11-2016 02:36 PM
I have created a kafka producer -- from kafka import KafkaProducer import json,time userdata={ "ipaddress": "172.16.0.57", "logtype": "", "mid": "", "newsession": "4917279149950184029a78e4a-e694-438f-b994-39897e346953", "previousurl": "/", "searchtext": "", "sessionid": "29a78e4a-e694-438f-b994-39897e346953", "source": "desktop", "uid": "Chrome4929a78e4a-e694-438f-b994-39897e346953", "url": "http://172.16.0.57/", "useragent": "Mozilla/5.0%20(Windows%20NT%2010.0", "utmsocial": "null", "utmsource": "null", "createdtime": "2016-05-03 12:27:38", "latency": 13260.0, "serviceurl": "http://localhost:8080/Business-Web/services/product/getBestDealNew", "domainlayeripaddress": "localhost", "name":"TJ" } producer = KafkaProducer(bootstrap_servers=['172.16.10.13:6667','172.16.10.14:6667'],value_serializer=lambda v: json.dumps(v).encode('utf-8')) for i in range(10): print("adding",i) producer.send('event', userdata) #if i < 10: # producer.send('event', '\n') time.sleep(3)
And python code to consume json data from kafka . I run this python code like.
spark-submit --jars /usr/hdp/2.3.4.7-4/spark/lib/spark-assembly-1.5.2.2.3.4.7-4-hadoop2.7.1.2.3.4.7-4.jar,/usr/hdp/2.3.4.7-4/spark/lib/spark-streaming-kafka-assembly_2.10-1.6.1.jar /home/hadoop/tajinder/clickstream_streaming.py
from pyspark.sql import SQLContext from pyspark import SparkContext, SparkConf from pyspark.streaming import StreamingContext from pyspark.streaming.kafka import KafkaUtils import json sc = SparkContext(appName="Clickstream_kafka") stream = StreamingContext(sc, 2) kafka_stream = KafkaUtils.createStream(stream,"172.16.10.13:2181","raw-event-streaming-consumer",{"event":1}) parsed = kafka_stream.map(lambda (k, v): json.loads(v)) parsed.pprint() stream.start() stream.awaitTermination()
I am able to recieve json data in spark from kafka, but how to convert it to RDD or as table(schema RDD) in pyspark so that RDD operations can be applied on it?
Created 12-08-2016 03:24 AM
Did you look at jsonRDD something like this
val jsonSchemaRDD = sqlContext.jsonRDD(jsons)// Pass in RDD directly
jsonSchemaRDD.registerTempTable("testjson")
sqlContext.sql("SELECT * FROM testjson where .... ").collect