Options
- Subscribe to RSS Feed
- Mark Question as New
- Mark Question as Read
- Float this Question for Current User
- Bookmark
- Subscribe
- Mute
- Printer Friendly Page
pyspark streaming writing data in to hive using foreachbatch method
Labels:
- Labels:
-
Apache Spark
Expert Contributor
Created on ‎02-18-2021 09:19 AM - edited ‎02-18-2021 09:21 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Hi everybody,
I am trying the following approach to write data in to hive table.
import logging
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
from pyspark.streaming.kafka import KafkaUtils
import datetime
from pyspark.sql.functions import lit,unix_timestamp
from os.path import *
from pyspark import Row
warehouseLocation = abspath("spark-warehouse")
spark = SparkSession.builder.appName("spark_streaming").config("spark.sql.warehouse.dir", warehouseLocation).enableHiveSupport().getOrCreate()
kafka = "kafka"
offsets = "earliest"
servers = "server_1:port,server_2:port"
security_protocol = "SSL"
keystore_location = "keystore"
keystore_password = "keystore_password"
kafka_topic = "kafka_topic"
checkpoint_location ="/checkpoint/location"
def hiveInsert(df, batchId):
df.createOrReplaceTempView("updates")
spark.sql("insert into hive_db.hive_table select value, time_stamp from updates")
df = spark.readStream.format(kafka).option("startingoffsets", offsets).option("kafka.bootstrap.servers", servers).option("kafka.security.protocol", security_protocol).option("kafka.ssl.keystore.location", keystore_location).option("kafka.ssl.keystore.password", keystore_password).option("subscribe",kafka_topic).load().selectExpr("CAST(value AS STRING)").select('value').withColumn('time_stamp',lit(datetime.datetime.now().strftime('%Y%m%d%H%M')))
query = df.writeStream.foreachBatch(hiveInsert).start()
query.awaitTermination()
The above code is not working
Any pointers are of great help!
1 REPLY 1
New Contributor
Created ‎09-22-2021 05:17 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Hi @mark_hadoop , Is this solved ? If yes ,what was the issue and how were you able to fix it ?
Thanks,
Albin
