Support Questions
Find answers, ask questions, and share your expertise
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Spark Streaming - Kudu as sink with Upsert


Spark Streaming - Kudu as sink with Upsert

New Contributor

Hello all,


I'm trying to work on a Streaming Appllication that keeps the current state of an IoT Device. I want to be able to store the IoT devices current location, the last time it reported in and some aggregations in an arbitrary state.


I've got the code for the above working with an ouput to the console, but my current issue is that I now need to make this code productionised by being able to query the state to put this out to a user, such as where is my IoT device now.

Im using structures streaming with Spark

but I'd like to either


A) Use kudu as a sink and query kudu for the latest data. and have an upsert to the data when it changes


B) access the state some other way


The only way I've been able to see to do option A so far is the ForeachWritter 


Has anyone done this before?



Re: Spark Streaming - Kudu as sink with Upsert

Cloudera Employee

Hi @wchandler, I don't have much experience with the Spark structured streaming APIs, but I have a good bit of knowledge on Kudu and Spark RDD/SparkSQL so maybe I can help out.  Perhaps it's just my lack of knowledge about the streaming APIs, but I'm having trouble pinpointing what's causing you trouble.  Method A) seems reasonable to me, however it seems like you'd only need to query Kudu once at the application startup for the current state, and then afterwords perhaps it can be kept in-memory?

Re: Spark Streaming - Kudu as sink with Upsert

New Contributor

Thanks @Dan Burkert - I've done some more work on this and it seems that its possible, but I ran into a few issues with it.

There is no sink for kudu in spark streaming, so you have to do a foreachwriter sink instead.

Then convert the state class to a dataframe and upsert each row at a time.


I backed out of this approach as when doing a Seq() on the value output by the foreachwriter to convert to a dataframe, the spark session was lost within the process stage, and I was unable to initiate spark session at this area.


My code incase anyone has tried this before and knows a way around it:[...])
.foreach(new ForeachWriter[IoTState]
override def open(partitionId: Long, version: Long): Boolean = {

override def process(value: IoTState): Unit = {

val valueDF: DataFrame = Seq(value).toDF()

kuduContext.upsertRows(valueDF, conf.kuduTable)
override def close(errorOrNull: Throwable): Unit = {
.trigger(Trigger.ProcessingTime("2 seconds"))