Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

can someone point me to a good tutorial on spark streaming to use with kafka

avatar
Contributor

I am trying to fetch json format data from kafka through spark streaming and want to create a temp table in spark to query json data like normal table.

i tried several tutorials available on internet but did'nt get success. I am able to read a text file from hdfs and process it through spark, but stuck using json data from kafka.

can somebody guide me on this.

1 ACCEPTED SOLUTION

avatar
Master Guru

And shameless plug:

https://community.hortonworks.com/content/kbentry/25726/spark-streaming-explained-kafka-to-phoenix.h...

You can have a look at the parser class I wrote. You would need to write something similar that parses your JSON object and returns a Java/Scala object that you can then use in your analytics

View solution in original post

8 REPLIES 8

avatar
Super Guru

avatar
@Tajinderpal Singh

You can refer to below Spark documentation:

http://spark.apache.org/docs/latest/streaming-kafka-integration.html

Thanks and Regards,

Sindhu

avatar
Master Guru

And shameless plug:

https://community.hortonworks.com/content/kbentry/25726/spark-streaming-explained-kafka-to-phoenix.h...

You can have a look at the parser class I wrote. You would need to write something similar that parses your JSON object and returns a Java/Scala object that you can then use in your analytics

avatar
Super Guru

nice article..

avatar
Master Guru

Thanks a lot 🙂

avatar
Master Guru

avatar
Contributor
I have created a kafka producer --

from kafka import KafkaProducer
import json,time


userdata={
        "ipaddress": "172.16.0.57",
        "logtype": "",
        "mid": "",
        "newsession": "4917279149950184029a78e4a-e694-438f-b994-39897e346953",
        "previousurl": "/",
        "searchtext": "",
        "sessionid": "29a78e4a-e694-438f-b994-39897e346953",
        "source": "desktop",
        "uid": "Chrome4929a78e4a-e694-438f-b994-39897e346953",
        "url": "http://172.16.0.57/",
        "useragent": "Mozilla/5.0%20(Windows%20NT%2010.0",
        "utmsocial": "null",
        "utmsource": "null",
        "createdtime": "2016-05-03 12:27:38",
        "latency": 13260.0,
        "serviceurl": "http://localhost:8080/Business-Web/services/product/getBestDealNew",
        "domainlayeripaddress": "localhost",
        "name":"TJ"
}


producer = KafkaProducer(bootstrap_servers=['172.16.10.13:6667','172.16.10.14:6667'],value_serializer=lambda v: json.dumps(v).encode('utf-8'))
for i in range(10):
    print("adding",i)
    producer.send('event', userdata)
    #if i < 10:
     #   producer.send('event', '\n')
    time.sleep(3)


And python code to consume json data from kafka . I run this python code like.

spark-submit --jars /usr/hdp/2.3.4.7-4/spark/lib/spark-assembly-1.5.2.2.3.4.7-4-hadoop2.7.1.2.3.4.7-4.jar,/usr/hdp/2.3.4.7-4/spark/lib/spark-streaming-kafka-assembly_2.10-1.6.1.jar /home/hadoop/tajinder/clickstream_streaming.py

from pyspark.sql import SQLContext
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

import json
sc = SparkContext(appName="Clickstream_kafka")
stream = StreamingContext(sc, 2)
kafka_stream = KafkaUtils.createStream(stream,"172.16.10.13:2181","raw-event-streaming-consumer",{"event":1})
parsed = kafka_stream.map(lambda (k, v): json.loads(v))
parsed.pprint()
stream.start()
stream.awaitTermination()

I am able to recieve json data in spark from kafka, but how to convert it to RDD or as table(schema RDD) in pyspark so that RDD operations can be applied on it?

avatar
Rising Star

Did you look at jsonRDD something like this

val jsonSchemaRDD = sqlContext.jsonRDD(jsons)// Pass in RDD directly
jsonSchemaRDD.registerTempTable("testjson")
sqlContext.sql("SELECT * FROM testjson where .... ").collect