Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

help needed - psypark kafka streaming to hive

help needed - psypark kafka streaming to hive

Expert Contributor

Hi Everyone,

I am an entry guy to spark.

I am trying to get consume a topic from kafka, which is a stream of simple jsons

 

kafka-topic-name:20191203
-------------------------
{"animal":"lion","type":"carni","speed":"60"}
{"animal":"deer","type":"herbi","speed":"65"}
{"animal":"cat","type":"carni","speed":"15"}

 

 My pyspark code looks like below

 

 

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql.functions import *
from pyspark.sql.types import *

vAppName = "kafkaSparkStreaming"
vKafkaBootstrapServerList = "broker1:6668,broker2:6668
vKafkaSecurityProtocol = "SSL"
vKafkaSSLKeystoreLocation = "keystore.jks"
vKafkaSSLKeystorePassword = "somepassword"
vKafkaTopic = "20191203"

schema = StructType().add("animal",StringType(),True).add("type",StringType(),True).add("speed",StringType(),True)

spark = SparkSession.builder.appName(vAppName).getOrCreate()
hive = SparkSession.builder.enableHiveSupport().getOrCreate()

df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", vKafkaBootstrapServerList) \
.option("kafka.security.protocol", vKafkaSecurityProtocol) \
.option("kafka.ssl.keystore.location", vKafkaSSLKeystoreLocation) \
.option("kafka.ssl.keystore.password", vKafkaSSLKeystorePassword) \
.option("subscribe", vKafkaTopic) \
.load() \
.select(from_json(col("value").cast("string"),schema=schema).alias("parsed_value"))

query=df.writeStream.format("memory").queryName("t10").start()

raw= spark.sql("select parsed_value.* from t10")

 

 

when I do the above I dont see any results.

My next step is to push data in to a hive table(instead of select, I though of using insert)

 

 

Things to note:

My data volume in to kafka is 2M/sec.

Please help me with the above

P.S: as I am new to working with spark, please let me know if above approach is wrong and guide me to the correct one

Don't have an account?
Coming from Hortonworks? Activate your account here