Reply
New Contributor
Posts: 3
Registered: ‎11-22-2017

Spark Streaming - Kudu as sink with Upsert

Hello all,

 

I'm trying to work on a Streaming Appllication that keeps the current state of an IoT Device. I want to be able to store the IoT devices current location, the last time it reported in and some aggregations in an arbitrary state.

 

I've got the code for the above working with an ouput to the console, but my current issue is that I now need to make this code productionised by being able to query the state to put this out to a user, such as where is my IoT device now.

Im using structures streaming with Spark

but I'd like to either

 

A) Use kudu as a sink and query kudu for the latest data. and have an upsert to the data when it changes

or

B) access the state some other way

 

The only way I've been able to see to do option A so far is the ForeachWritter 

 

Has anyone done this before?

Thanks

Cloudera Employee
Posts: 20
Registered: ‎09-28-2015

Re: Spark Streaming - Kudu as sink with Upsert

Hi @wchandler, I don't have much experience with the Spark structured streaming APIs, but I have a good bit of knowledge on Kudu and Spark RDD/SparkSQL so maybe I can help out.  Perhaps it's just my lack of knowledge about the streaming APIs, but I'm having trouble pinpointing what's causing you trouble.  Method A) seems reasonable to me, however it seems like you'd only need to query Kudu once at the application startup for the current state, and then afterwords perhaps it can be kept in-memory?

Highlighted
New Contributor
Posts: 3
Registered: ‎11-22-2017

Re: Spark Streaming - Kudu as sink with Upsert

Thanks @Dan Burkert - I've done some more work on this and it seems that its possible, but I ran into a few issues with it.

There is no sink for kudu in spark streaming, so you have to do a foreachwriter sink instead.

Then convert the state class to a dataframe and upsert each row at a time.

 

I backed out of this approach as when doing a Seq() on the value output by the foreachwriter to convert to a dataframe, the spark session was lost within the process stage, and I was unable to initiate spark session at this area.

 

My code incase anyone has tried this before and knows a way around it:

 

df.select([...])
.as[IoTState]
.groupbyKey(_.assetId)
.mapGroupsWithState(GroupStateTimeout.NoTimeout)(updateIoTStateFromEvent)
.writeStream
.option("checkpointLocation",conf.stateCheckPoint)
.format("console")
.foreach(new ForeachWriter[IoTState]
{
override def open(partitionId: Long, version: Long): Boolean = {
true

}
override def process(value: IoTState): Unit = {

val valueDF: DataFrame = Seq(value).toDF()

kuduContext.upsertRows(valueDF, conf.kuduTable)
}
override def close(errorOrNull: Throwable): Unit = {
}
})
.outputMode("update")
.trigger(Trigger.ProcessingTime("2 seconds"))
.start()
.awaitTermination()

 

 

Announcements