Support Questions
Find answers, ask questions, and share your expertise

Use a defined class as State in a Spark Streaming application

Use a defined class as State in a Spark Streaming application

I need to use a defined class as State in a spark streaming application. I know it is possible, but when an event with new key appears in the mapWithState function i can't create a new object of the class as it makes the function not-serializable. Example below. I'm using Spark 1.6 for this.

This works, but rises a java.util.NoSuchElementException when an event with new key apperars:

val mappingFunc = (phone: String, value: Option[Cdr], state: State[User]) => {
  val cdr = value.getOrElse(null)
  val user = state.get()
  user.update(cdr)
  state.update(user)
  user
}

When I try to create a new user, it gives me a org.apache.spark.SparkException: Task not serializable

val mappingFunc = (phone: String, value: Option[Cdr], state: State[User]) => {
  val cdr = value.getOrElse(null)
  val user = state.getOption().getOrElse(new User(cdr))
  user.update(cdr)
  state.update(user)
  user
}

How can I create a new user if it makes the class not serializable?

2 REPLIES 2

Re: Use a defined class as State in a Spark Streaming application

Explorer

Is your `User` class serializable? I cannot tell the exact reason why Task is not serializable based on the information you provided. Can you also provides more information, like the implementation of `User` if possible?

Re: Use a defined class as State in a Spark Streaming application

Yes, User is serializable (or I thin it is). Here is the implementation of the class:

class User(cdr: Cdr) extends Serializable {
  var phone: String = cdr.phone
  var calls: Int = 0
  def update(cdr: Cdr) {
    calls += 1
  }
}

I also copy the line where the exeption rises:

var statespec = StateSpec.function(mappingFunc)
// statespec is used later in cdrs.mapWithState(statespec)