Created 02-27-2017 09:13 AM
I worked with Learning Spark chapter 6. working with partitions. But, I had JsonMappingException:
No suitable constructor found for type [simple type, class $iwC$$iwC$CallLog]: can not instantiate from JSON object (need to add/enable type information?)
CallLog class were defined as follows:
case class CallLog(callsign: String="", contactlat: Double, contactlong: Double, mylat: Double, mylong: Double)
http response data like this:
http://new73s.herokuapp.com/qsos/KK6JKQ.json
My code gererating Exception is:
import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.databind.DeserializationFeature import com.fasterxml.jackson.module.scala.DefaultScalaModule import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper import org.eclipse.jetty.client.ContentExchange import org.eclipse.jetty.client.HttpClient // define calllog format - real data types are all String. case class CallLog(callsign: String="", contactlat: Double, contactlong: Double, mylat: Double, mylong: Double) // create JSON readerWriter def createMapper() = { val mapper = new ObjectMapper() mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) mapper.registerModule(DefaultScalaModule) mapper } // http container which delivers (JSON typed)calllog per callsign. def createExchangeForSign(client: HttpClient, sign: String): (String, ContentExchange) = { val exchange = new ContentExchange() exchange.setURL(s"http://new73s.herokuapp.com/qsos/${sign}.json") client.send(exchange) (sign, exchange) } // Draw out json typed response, // Read it with CallLog format, and Return Array[CallLog]. def readExchangeCallLog(mapper: ObjectMapper, exchange: ContentExchange): Array[CallLog] = { exchange.waitForDone() val responseJson = exchange.getResponseContent() val qsos = mapper.readValue(responseJson, classOf[Array[CallLog]]) qsos } val contactsContactLists = validSigns.distinct().mapPartitions{ signs => val mapper = createMapper() // create a connection pool val client = new HttpClient() client.start() // create http request signs.map {sign => createExchangeForSign(client, sign) // fetch responses }.map{ case (sign, exchange) => (sign, readExchangeCallLog(mapper, exchange)) }.filter(x => x._2 != null) // Remove empty CallLogs } println(contactsContactLists.collect().toList)
validSigns RDD have lazy data liky [u'W8PAL', u'KK6JKQ', u'W6BB', u'VE3UOW', u'VE2CUA', u'VE2UN', u'OH2TI', u'GB1MIR', u'K2AMH', u'UA1LO', u'N7ICE'].
How can I try to remove the Exception and get results. I worked with zeppelin at HDP 2.5 sandbox.
Created 02-27-2017 12:47 PM
I have some trial. so, I have success but another exception is generated.
exception is like: readExchangeCallLog: (mapper: com.fasterxml.jackson.databind.ObjectMapper, exchange: org.eclipse.jetty.client.ContentExchange)Array[CallLog] org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$clean(ClosureCleaner.scala:294)
source code is that:
import java.io.Serializable import org.apache.spark._ import org.apache.spark.SparkContext._ import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.databind.DeserializationFeature import com.fasterxml.jackson.module.scala.DefaultScalaModule import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper import org.eclipse.jetty.client.ContentExchange import org.eclipse.jetty.client.HttpClient case class CallLog(callsign: String="", contactlat: Double, contactlong: Double, mylat: Double, mylong: Double) // scala 기능 모듈을 탑재한 JSON reader 생성 def createMapper() = { val mapper = new ObjectMapper() with ScalaObjectMapper mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) mapper.registerModule(DefaultScalaModule) mapper } // DB에서 callsign별 (JSON)calllog를 가져올 http 컨테이너 def createExchangeForSign(client: HttpClient, sign: String): (String, ContentExchange) = { val exchange = new ContentExchange() with Serializable exchange.setURL(s"http://new73s.herokuapp.com/qsos/${sign}.json") client.send(exchange) (sign, exchange) } // http 컨테이너가 담아오면, json 형식의 String response를 꺼내어, // CallLog 형식으로 읽고 그 내용을 Array[CallLog]로 반환. def readExchangeCallLog(mapper: ObjectMapper, exchange: ContentExchange): Array[CallLog] = { exchange.waitForDone() val responseJson = exchange.getResponseContent() val qsos = mapper.readValue(responseJson, classOf[Array[CallLog]]) qsos } // http clinet를 생성하여 callsign에 맞는 log를 가져와서, // mapper를 통해 JSON형식의 log를 CallLog형식으로 변환하고, // calllog가 없는 data는 제외하여 리턴한다. val contactsContactLists = validSigns.distinct().mapPartitions{ signs => val mapper = createMapper() // create a connection pool val client = new HttpClient() client.start() // create http request signs.map {sign => createExchangeForSign(client, sign) // fetch responses }.map{ case (sign, exchange) => (sign, readExchangeCallLog(mapper, exchange)) }.filter(x => x._2 != null) // Remove empty CallLogs } println(contactsContactLists.collect().toList)
Created 03-02-2017 03:07 PM
Task Not serializable is unrelated and very common.
The way the scala API works, operations on RDDs like map() work by having the state of the lambda expression copied over to all the worker nodes and then executed. For this to happen, every object referenced inside the expression must be "Serializable", in the strict java API way: it is declared as something which can be serialized to a byte stream, sent over the network and reconstructed at the far end.
Something you have declared outside the map, which you are trying to use it inside, isn't serializable. At a guess: one of the Jetty classes, like the "exchange" variable. Workaround? Create the object inside the lambda expression, out of data that has been serialized (strings etc)