Support Questions

Find answers, ask questions, and share your expertise

com.fasterxml.jackson.databind.JsonMappingException: No suitable constructor found for type [simple type, class $iwC$$iwC$CallLog]

avatar
Explorer

I worked with Learning Spark chapter 6. working with partitions. But, I had JsonMappingException:

No suitable constructor found for type [simple type, class $iwC$$iwC$CallLog]: can not instantiate from JSON object (need to add/enable type information?)

CallLog class were defined as follows:

case class CallLog(callsign: String="", contactlat: Double, contactlong: Double, mylat: Double, mylong: Double)

http response data like this:

http://new73s.herokuapp.com/qsos/KK6JKQ.json

My code gererating Exception is:

import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.DeserializationFeature
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper

import org.eclipse.jetty.client.ContentExchange
import org.eclipse.jetty.client.HttpClient

// define calllog format - real data types are all String.
case class CallLog(callsign: String="", contactlat: Double,
    contactlong: Double, mylat: Double, mylong: Double)

// create JSON readerWriter
def createMapper() = {
    val mapper = new ObjectMapper()
    mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
    mapper.registerModule(DefaultScalaModule)
    mapper
}

// http container which delivers (JSON typed)calllog per callsign.
def createExchangeForSign(client: HttpClient, sign: String): (String, ContentExchange) = {
    val exchange = new ContentExchange()
    exchange.setURL(s"http://new73s.herokuapp.com/qsos/${sign}.json")
    client.send(exchange)
    (sign, exchange)
}

// Draw out json typed response,
// Read it with CallLog format, and Return Array[CallLog].
def readExchangeCallLog(mapper: ObjectMapper, exchange: ContentExchange): Array[CallLog] = {
    exchange.waitForDone()
    val responseJson = exchange.getResponseContent()
    val qsos = mapper.readValue(responseJson, classOf[Array[CallLog]])
    qsos
}

val contactsContactLists = validSigns.distinct().mapPartitions{
    signs =>
    val mapper = createMapper()
    // create a connection pool
    val client = new HttpClient()
    client.start()
    // create http request
    signs.map {sign =>
        createExchangeForSign(client, sign)
    // fetch responses
    }.map{ case (sign, exchange) =>
        (sign, readExchangeCallLog(mapper, exchange))
    }.filter(x => x._2 != null) // Remove empty CallLogs
}
println(contactsContactLists.collect().toList)

validSigns RDD have lazy data liky [u'W8PAL', u'KK6JKQ', u'W6BB', u'VE3UOW', u'VE2CUA', u'VE2UN', u'OH2TI', u'GB1MIR', u'K2AMH', u'UA1LO', u'N7ICE'].

How can I try to remove the Exception and get results. I worked with zeppelin at HDP 2.5 sandbox.

2 REPLIES 2

avatar
Explorer

I have some trial. so, I have success but another exception is generated.

exception is like: readExchangeCallLog: (mapper: com.fasterxml.jackson.databind.ObjectMapper, exchange: org.eclipse.jetty.client.ContentExchange)Array[CallLog] org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$clean(ClosureCleaner.scala:294)

source code is that:

import java.io.Serializable
import org.apache.spark._
import org.apache.spark.SparkContext._

import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.DeserializationFeature
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper

import org.eclipse.jetty.client.ContentExchange
import org.eclipse.jetty.client.HttpClient


case class CallLog(callsign: String="", contactlat: Double,
    contactlong: Double, mylat: Double, mylong: Double)


// scala 기능 모듈을 탑재한 JSON reader 생성
def createMapper() = {
    val mapper = new ObjectMapper() with ScalaObjectMapper
    mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
    mapper.registerModule(DefaultScalaModule)
    mapper
}


// DB에서 callsign별 (JSON)calllog를 가져올 http 컨테이너
def createExchangeForSign(client: HttpClient, sign: String): (String, ContentExchange) = {
	val exchange = new ContentExchange() with Serializable
	exchange.setURL(s"http://new73s.herokuapp.com/qsos/${sign}.json")
	client.send(exchange)
	(sign, exchange)
}


// http 컨테이너가 담아오면, json 형식의 String response를 꺼내어,
// CallLog 형식으로 읽고 그 내용을 Array[CallLog]로 반환.
def readExchangeCallLog(mapper: ObjectMapper, exchange: ContentExchange): Array[CallLog] = {
	exchange.waitForDone()
	val responseJson = exchange.getResponseContent()
	val qsos = mapper.readValue(responseJson, classOf[Array[CallLog]])
	qsos
}


// http clinet를 생성하여 callsign에 맞는 log를 가져와서,
// mapper를 통해 JSON형식의 log를 CallLog형식으로 변환하고,
// calllog가 없는 data는 제외하여 리턴한다.
val contactsContactLists = validSigns.distinct().mapPartitions{
    signs =>
    val mapper = createMapper()
    // create a connection pool
    val client = new HttpClient()
    client.start()
    // create http request
    signs.map {sign =>
	    createExchangeForSign(client, sign)
    // fetch responses
    }.map{ case (sign, exchange) =>
	    (sign, readExchangeCallLog(mapper, exchange))
    }.filter(x => x._2 != null) // Remove empty CallLogs
}
println(contactsContactLists.collect().toList)

avatar

Task Not serializable is unrelated and very common.

The way the scala API works, operations on RDDs like map() work by having the state of the lambda expression copied over to all the worker nodes and then executed. For this to happen, every object referenced inside the expression must be "Serializable", in the strict java API way: it is declared as something which can be serialized to a byte stream, sent over the network and reconstructed at the far end.

Something you have declared outside the map, which you are trying to use it inside, isn't serializable. At a guess: one of the Jetty classes, like the "exchange" variable. Workaround? Create the object inside the lambda expression, out of data that has been serialized (strings etc)