below is my code , i m reading the data from kafka having json data , and i wanted to store the data into postgresql. i have created the database and table with schema in postgrase but it doesnot allow streaming data ingestion.
val spark = SparkSession.builder().master("local[*]")
.appName("sample_data_testing").getOrCreate()
val schema = new StructType()
.add("name","String")
.add("serial_number","Long")
import spark.implicits._
spark.sparkContext.setLogLevel("ERROR")
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "topic1")
.option("startingOffsets", "earliest")
.load()
val data = df.select($"value" cast "string" as "json")
.select(from_json($"json", schema) as "data")
.select("data.*")
data.printSchema()
val pgdata = data.writeStream
.format("jdbc")
.option("url", "jdbc:postgresql://localhost:5432/spark_db")
.option("dbtable", "spark_data")
.option("user", "username")
.option("password", "password")
.start().awaitTermination()
}
}
error - Data source jdbc does not support streamed writing