Support Questions
Find answers, ask questions, and share your expertise
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Spark 2 Scala Structured Streaming Query gives DFSClient: Caught exception


Spark 2 Scala Structured Streaming Query gives DFSClient: Caught exception


Hello community,

I am trying to write a Spark 2 Structured Streaming program in Spark shell using Scala, following the Apache Programming Guides. The program consumes Kafka messages. I have verified that the Kafka messages exist in their proper format using the Kafka Console Consumer. Everything works well till I print the streaming query. Upon printing I get the following error. Any help would be greatly appreciated.

Config Details: Ambari managed HDP 2.6 on a single-node cluster of Spark 2, Kafka, and HDFS / Yarn

scala> printWindow(slidingWindowDS, "trade_volume")

res0: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@1b320c39

scala> -------------------------------------------

Batch: 0 -------------------------------------------

[Stage 3:=================================> (121 + 3) / 200]17/08/10 23:01:54 WARN DFSClient: Caught exception java.lang.InterruptedException at java.lang.Object.wait(Native Method) at java.lang.Thread.join( at java.lang.Thread.join( at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.closeResponder( at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.endBlock( at org.apache.hadoop.hdfs.DFSOutputStream$



+-----+---+------------+ +-----+---+------------+

My code is given below:

import org.apache.spark.sql.{DataFrame, Encoders, SparkSession}
import org.apache.spark.sql.functions.from_json
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.functions._

def printWindow(windowDF: DataFrame, aggCol: String) = {
  windowDF.sort("window.start").select("window.start", "window.end", s"$aggCol")
case class StockQuote(exchange: String,
                      ticker: String,
                      lastTradeDate: java.sql.Timestamp,
                      open: Float,
                      high: Float,
                      low: Float,
                      close: Float,
                      volume: Float)

val schema = Encoders.product[StockQuote].schema

// Use Spark DSL to create the quotes base data frame

val quotesDF = spark
  .option("kafka.bootstrap.servers", "")
  .option("subscribe", "stock-quotes")
  .option("startingOffsets", "earliest")
  .withColumn("quote", from_json(col("value").cast("string"), schema))
  .select(col(""), col("quote.ticker"), col("quote.lastTradeDate"),
    col(""), col("quote.high"), col("quote.low"), col("quote.close"), col("quote.volume")

val stocks2017DS = quotesDF.filter("year(lastTradeDate)==2017")

val slidingWindowDS = stocks2017DS.groupBy(window($"lastTradeDate", "3 minutes", "30 seconds"))
  .agg(sum("volume").as("trade_volume"))// window duration and slide duration specified

printWindow(slidingWindowDS, "trade_volume")
Don't have an account?
Coming from Hortonworks? Activate your account here