I need to use a defined class as State in a spark streaming application. I know it is possible, but when an event with new key appears in the mapWithState function i can't create a new object of the class as it makes the function not-serializable. Example below. I'm using Spark 1.6 for this.
This works, but rises a java.util.NoSuchElementException when an event with new key apperars:
val mappingFunc = (phone: String, value: Option[Cdr], state: State[User]) => {
val cdr = value.getOrElse(null)
val user = state.get()
user.update(cdr)
state.update(user)
user
}
When I try to create a new user, it gives me a org.apache.spark.SparkException: Task not serializable
val mappingFunc = (phone: String, value: Option[Cdr], state: State[User]) => {
val cdr = value.getOrElse(null)
val user = state.getOption().getOrElse(new User(cdr))
user.update(cdr)
state.update(user)
user
}
How can I create a new user if it makes the class not serializable?