Support Questions
Find answers, ask questions, and share your expertise

pyspark streaming writing data in to hive using foreachbatch method

Expert Contributor

Hi everybody,

I am trying the following approach to write data in to hive table.




import logging
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
from pyspark.streaming.kafka import KafkaUtils
import datetime
from pyspark.sql.functions import lit,unix_timestamp
from os.path import *
from pyspark import Row

warehouseLocation = abspath("spark-warehouse")
spark = SparkSession.builder.appName("spark_streaming").config("spark.sql.warehouse.dir", warehouseLocation).enableHiveSupport().getOrCreate()

kafka = "kafka"
offsets = "earliest"
servers = "server_1:port,server_2:port"
security_protocol = "SSL"
keystore_location = "keystore"
keystore_password = "keystore_password"
kafka_topic = "kafka_topic"
checkpoint_location ="/checkpoint/location"

def hiveInsert(df, batchId):
        spark.sql("insert into hive_db.hive_table select value, time_stamp from updates")

df = spark.readStream.format(kafka).option("startingoffsets", offsets).option("kafka.bootstrap.servers", servers).option("", security_protocol).option("kafka.ssl.keystore.location", keystore_location).option("kafka.ssl.keystore.password", keystore_password).option("subscribe",kafka_topic).load().selectExpr("CAST(value AS STRING)").select('value').withColumn('time_stamp',lit('%Y%m%d%H%M')))

query = df.writeStream.foreachBatch(hiveInsert).start()





The above code is not working

Any pointers are of great help!


New Contributor

Hi @mark_hadoop , Is this solved ? If yes ,what was the issue and how were you able to fix it ?