Member since
08-21-2018
3
Posts
0
Kudos Received
0
Solutions
08-31-2018
04:02 AM
below is my code , i m reading the data from kafka having json data , and i wanted to store the data into postgresql. i have created the database and table with schema in postgrase but it doesnot allow streaming data ingestion.
val spark = SparkSession.builder().master("local[*]") .appName("sample_data_testing").getOrCreate()
val schema = new StructType() .add("name","String") .add("serial_number","Long")
import spark.implicits._ spark.sparkContext.setLogLevel("ERROR")
val df = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "localhost:9092") .option("subscribe", "topic1") .option("startingOffsets", "earliest") .load()
val data = df.select($"value" cast "string" as "json") .select(from_json($"json", schema) as "data") .select("data.*")
data.printSchema()
val pgdata = data.writeStream .format("jdbc") .option("url", "jdbc:postgresql://localhost:5432/spark_db") .option("dbtable", "spark_data") .option("user", "username") .option("password", "password") .start().awaitTermination() } }
error - Data source jdbc does not support streamed writing
... View more
Labels:
- Labels:
-
Apache Kafka
-
Apache Spark