Member since
12-07-2016
10
Posts
0
Kudos Received
0
Solutions
02-27-2017
12:47 PM
I have some trial. so, I have success but another exception is generated. exception is like:
readExchangeCallLog: (mapper: com.fasterxml.jackson.databind.ObjectMapper, exchange: org.eclipse.jetty.client.ContentExchange)Array[CallLog]
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$clean(ClosureCleaner.scala:294) source code is that: import java.io.Serializable
import org.apache.spark._
import org.apache.spark.SparkContext._
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.DeserializationFeature
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
import org.eclipse.jetty.client.ContentExchange
import org.eclipse.jetty.client.HttpClient
case class CallLog(callsign: String="", contactlat: Double,
contactlong: Double, mylat: Double, mylong: Double)
// scala 기능 모듈을 탑재한 JSON reader 생성
def createMapper() = {
val mapper = new ObjectMapper() with ScalaObjectMapper
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
mapper.registerModule(DefaultScalaModule)
mapper
}
// DB에서 callsign별 (JSON)calllog를 가져올 http 컨테이너
def createExchangeForSign(client: HttpClient, sign: String): (String, ContentExchange) = {
val exchange = new ContentExchange() with Serializable
exchange.setURL(s"http://new73s.herokuapp.com/qsos/${sign}.json")
client.send(exchange)
(sign, exchange)
}
// http 컨테이너가 담아오면, json 형식의 String response를 꺼내어,
// CallLog 형식으로 읽고 그 내용을 Array[CallLog]로 반환.
def readExchangeCallLog(mapper: ObjectMapper, exchange: ContentExchange): Array[CallLog] = {
exchange.waitForDone()
val responseJson = exchange.getResponseContent()
val qsos = mapper.readValue(responseJson, classOf[Array[CallLog]])
qsos
}
// http clinet를 생성하여 callsign에 맞는 log를 가져와서,
// mapper를 통해 JSON형식의 log를 CallLog형식으로 변환하고,
// calllog가 없는 data는 제외하여 리턴한다.
val contactsContactLists = validSigns.distinct().mapPartitions{
signs =>
val mapper = createMapper()
// create a connection pool
val client = new HttpClient()
client.start()
// create http request
signs.map {sign =>
createExchangeForSign(client, sign)
// fetch responses
}.map{ case (sign, exchange) =>
(sign, readExchangeCallLog(mapper, exchange))
}.filter(x => x._2 != null) // Remove empty CallLogs
}
println(contactsContactLists.collect().toList)
... View more
02-27-2017
09:13 AM
I worked with Learning Spark chapter 6. working with partitions.
But, I had JsonMappingException: No suitable constructor found for type [simple type, class $iwC$$iwC$CallLog]: can not instantiate from JSON object (need to add/enable type information?) CallLog class were defined as follows: case class CallLog(callsign: String="", contactlat: Double,
contactlong: Double, mylat: Double, mylong: Double) http response data like this: http://new73s.herokuapp.com/qsos/KK6JKQ.json My code gererating Exception is: import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.DeserializationFeature
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
import org.eclipse.jetty.client.ContentExchange
import org.eclipse.jetty.client.HttpClient
// define calllog format - real data types are all String.
case class CallLog(callsign: String="", contactlat: Double,
contactlong: Double, mylat: Double, mylong: Double)
// create JSON readerWriter
def createMapper() = {
val mapper = new ObjectMapper()
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
mapper.registerModule(DefaultScalaModule)
mapper
}
// http container which delivers (JSON typed)calllog per callsign.
def createExchangeForSign(client: HttpClient, sign: String): (String, ContentExchange) = {
val exchange = new ContentExchange()
exchange.setURL(s"http://new73s.herokuapp.com/qsos/${sign}.json")
client.send(exchange)
(sign, exchange)
}
// Draw out json typed response,
// Read it with CallLog format, and Return Array[CallLog].
def readExchangeCallLog(mapper: ObjectMapper, exchange: ContentExchange): Array[CallLog] = {
exchange.waitForDone()
val responseJson = exchange.getResponseContent()
val qsos = mapper.readValue(responseJson, classOf[Array[CallLog]])
qsos
}
val contactsContactLists = validSigns.distinct().mapPartitions{
signs =>
val mapper = createMapper()
// create a connection pool
val client = new HttpClient()
client.start()
// create http request
signs.map {sign =>
createExchangeForSign(client, sign)
// fetch responses
}.map{ case (sign, exchange) =>
(sign, readExchangeCallLog(mapper, exchange))
}.filter(x => x._2 != null) // Remove empty CallLogs
}
println(contactsContactLists.collect().toList) validSigns RDD have lazy data liky [u'W8PAL', u'KK6JKQ', u'W6BB', u'VE3UOW', u'VE2CUA', u'VE2UN', u'OH2TI', u'GB1MIR', u'K2AMH', u'UA1LO', u'N7ICE']. How can I try to remove the Exception and get results.
I worked with zeppelin at HDP 2.5 sandbox.
... View more
Labels:
- Labels:
-
Apache Spark
02-27-2017
07:18 AM
How can see the dependency information needed to solve my error?
... View more
02-27-2017
05:05 AM
Thanks for your reply.
I have success with followings: %spark.dep
z.reset()
z.load("org.eclipse.jetty:jetty-client:7.6.13.v20130916")
%spark
import org.eclipse.jetty.client.ContentExchange
import org.eclipse.jetty.client.HttpClient
DepInterpreter(%dep) deprecated. Remove dependencies and repositories through GUI interpreter menu instead.
DepInterpreter(%dep) deprecated. Load dependency through GUI interpreter menu instead.
res1: org.apache.zeppelin.dep.Dependency = org.apache.zeppelin.dep.Dependency@2448cc17 import org.eclipse.jetty.client.ContentExchange import org.eclipse.jetty.client.HttpClient
... View more
02-26-2017
11:25 AM
thanks for replies. could I do it like followings?
export PYSPARK_PYTHON=/opt/anaconda/bin/python spark-submit spark-job.py
... View more
02-26-2017
11:13 AM
I have following error with zeppelin in HDP2.5. so, I have searched to know the installation and import scala packages like python. but I failed. How can I remove the errors. In addition, I don't want to use eclipse for spark, but zeppelin. Thanks. error said: "<console>:63: error: object eclipse is not a member of package org
import org.eclipse.jetty.client.ContentExchange" import org.eclipse.jetty.client.ContentExchange
import org.eclipse.jetty.client.HttpClient
def createExchangeForSign(client: HttpClient, sign: String): (String, ContentExchange) = {
val exchange = new ContentExchange()
exchange.setURL(s"http://new73s.herokuapp.com/qsos/${sign}.json")
client.send(exchange)
(sign, exchange)
}
def readExchangeCallLog(mapper: ObjectMapper, exchange: ContentExchange): Array[CallLog] = {
exchange.waitForDone()
val responseJson = exchange.getResponseContent()
val qsos = mapper.readValue(responseJson, classOf[Array[CallLog]])
qsos
}
val contactsContactLists = validSigns.distinct().mapPartitions{
signs =>
val mapper = createMapper()
// create a connection pool
val client = new HttpClient()
client.start()
// create http request
signs.map {sign =>
createExchangeForSign(client, sign)
// fetch responses
}.map{ case (sign, exchange) =>
(sign, readExchangeCallLog(mapper, exchange))
}.filter(x => x._2 != null) // Remove empty CallLogs
}
println(contactsContactLists.collect().toList)
... View more
Labels:
- Labels:
-
Apache Spark
12-07-2016
09:29 AM
I now use the sandbox at Windows 10 VirtualBox.
... View more
Labels:
- Labels:
-
Hortonworks Data Platform (HDP)
12-07-2016
02:51 AM
from root user I submit 'su hdfs', then t would not ask to me. Thank u!!
... View more
12-07-2016
02:25 AM
at http://ko.hortonworks.com/hadoop-tutorial/a-lap-around-apache-spark/ When sudo su hdfs, system require spark password.
what's the defualt password of hdfs and spark?
... View more
Labels:
- Labels:
-
Hortonworks Data Platform (HDP)