I want to use PySpark in order to stream a Kudu table, process the streaming data and stream it back into another Kudu table.
Below is the driver code I've been trying out (when using Kafka source it works)
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Kudu-stream") \
.getOrCreate()
df = spark \
.readStream \
.format("org.apache.kudu.spark.kudu") \
.option("kudu.master","example:7051") \
.option("kudu.table","db.source_test") \
.load()
def saveToKudu(batchDF, batchID):
batchDF.write \
.format("org.apache.kudu.spark.kudu") \
.option("kudu.master","example:7051") \
.option("kudu.table","db.sink_test") \
.mode("append") \
.save()
query = df \
.writeStream \
.outputMode("append") \
.foreachBatch(saveToKudu) \
.option("checkpointLocation","hdfs://example:8020/path/to/checkpoint/") \
.start()
query.awaitTermination()
This is the error message I've been getting.
py4j.protocol.Py4JJavaError: An error occurred while calling o40.load.
: java.lang.UnsupportedOperationException: Data source org.apache.kudu.spark.kudu does not support streamed reading
Is there a way to readStream directly from a Kudu table?
Extra: is it possible to writeStream to Kudu without using foreach or foreachBatch?