Hi everybody,
I am trying the following approach to write data in to hive table.
import logging
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
from pyspark.streaming.kafka import KafkaUtils
import datetime
from pyspark.sql.functions import lit,unix_timestamp
from os.path import *
from pyspark import Row
warehouseLocation = abspath("spark-warehouse")
spark = SparkSession.builder.appName("spark_streaming").config("spark.sql.warehouse.dir", warehouseLocation).enableHiveSupport().getOrCreate()
kafka = "kafka"
offsets = "earliest"
servers = "server_1:port,server_2:port"
security_protocol = "SSL"
keystore_location = "keystore"
keystore_password = "keystore_password"
kafka_topic = "kafka_topic"
checkpoint_location ="/checkpoint/location"
def hiveInsert(df, batchId):
df.createOrReplaceTempView("updates")
spark.sql("insert into hive_db.hive_table select value, time_stamp from updates")
df = spark.readStream.format(kafka).option("startingoffsets", offsets).option("kafka.bootstrap.servers", servers).option("kafka.security.protocol", security_protocol).option("kafka.ssl.keystore.location", keystore_location).option("kafka.ssl.keystore.password", keystore_password).option("subscribe",kafka_topic).load().selectExpr("CAST(value AS STRING)").select('value').withColumn('time_stamp',lit(datetime.datetime.now().strftime('%Y%m%d%H%M')))
query = df.writeStream.foreachBatch(hiveInsert).start()
query.awaitTermination()
The above code is not working
Any pointers are of great help!