Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

How to auto update %spark.sql result in zeppelin for structured streaming query

How to auto update %spark.sql result in zeppelin for structured streaming query

New Contributor

I'm running structured streaming in (spark 2.1.0 with zeppelin 0.7) for data coming from kafka and I'm trying to visualize the streaming result with spark.sql

as below :

%spark2
val spark = SparkSession
      .builder()
      .appName("Spark structured streaming Kafka example")
      .master("yarn")
      .getOrCreate()

    val inputstream = spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "n11.hdp.com:6667,n12.hdp.com:6667,n13.hdp.com:6667 ,n10.hdp.com:6667, n9.hdp.com:6667")
        .option("subscribe", "st")
        .load()

    val stream = inputstream.selectExpr("CAST( value AS STRING)").as[(String)].select(
                 expr("(split(value, ','))[0]").cast("string").as("pre_post_paid"),
                 expr("(split(value, ','))[1]").cast("double").as("DataUpload"),
                 expr("(split(value, ','))[2]").cast("double").as("DataDowndownload"))
               .filter("DataUpload is not null and DataDowndownload is not null")
              .groupBy("pre_post_paid").agg(sum("DataUpload") + sum("DataDowndownload") as "size")
     val query = stream.writeStream
    .format("memory")
    .outputMode("complete")
    .queryName("test")
    .start()

after it running I query on "test" as below:

%sql
select *
from test

it updates only when I running it manually, my question is How to make it updates as new data is processed (streaming visualization) as this example:

Insights Without Tradeoffs: Using Structured Streaming in Apache Spark