Support Questions
Find answers, ask questions, and share your expertise

help needed - psypark kafka streaming to hive

help needed - psypark kafka streaming to hive

Expert Contributor

Hi Everyone,

I am an entry guy to spark.

I am trying to get consume a topic from kafka, which is a stream of simple jsons

 

kafka-topic-name:20191203
-------------------------
{"animal":"lion","type":"carni","speed":"60"}
{"animal":"deer","type":"herbi","speed":"65"}
{"animal":"cat","type":"carni","speed":"15"}

 

 My pyspark code looks like below

 

 

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql.functions import *
from pyspark.sql.types import *

vAppName = "kafkaSparkStreaming"
vKafkaBootstrapServerList = "broker1:6668,broker2:6668
vKafkaSecurityProtocol = "SSL"
vKafkaSSLKeystoreLocation = "keystore.jks"
vKafkaSSLKeystorePassword = "somepassword"
vKafkaTopic = "20191203"

schema = StructType().add("animal",StringType(),True).add("type",StringType(),True).add("speed",StringType(),True)

spark = SparkSession.builder.appName(vAppName).getOrCreate()
hive = SparkSession.builder.enableHiveSupport().getOrCreate()

df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", vKafkaBootstrapServerList) \
.option("kafka.security.protocol", vKafkaSecurityProtocol) \
.option("kafka.ssl.keystore.location", vKafkaSSLKeystoreLocation) \
.option("kafka.ssl.keystore.password", vKafkaSSLKeystorePassword) \
.option("subscribe", vKafkaTopic) \
.load() \
.select(from_json(col("value").cast("string"),schema=schema).alias("parsed_value"))

query=df.writeStream.format("memory").queryName("t10").start()

raw= spark.sql("select parsed_value.* from t10")

 

 

when I do the above I dont see any results.

My next step is to push data in to a hive table(instead of select, I though of using insert)

 

 

Things to note:

My data volume in to kafka is 2M/sec.

Please help me with the above

P.S: as I am new to working with spark, please let me know if above approach is wrong and guide me to the correct one