Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Spark : Need help identifying the reason for slowness

Highlighted

Spark : Need help identifying the reason for slowness

Explorer

We have a spark streaming application receiving data from kafka parse it and save it to database using the code below

val lines: DStream[MetricTypes.InputStreamType] = myConsumer.createDefaultStream()
val keyDeLines = lines.filter(x => ((null != x) && !(x.equals(null) && (x._1 != null) && (x._2 != null) ))).map(x => {

val lmeMap: RawMetricsExtractor = new JsonExtractor[HttpEvent]()
try lmeMap.aParser(x)
catch {

case ase: Exception =>

logError("Error parsing item + "+ new GsonBuilder().setPrettyPrinting().create.toJson(x._2) + "]", ase)
None

}
//lmeMap.aParser(x)

})
val tempLines = keyDeLines.filter(x => ((null != x) && !(x.equals(null)))).filter(_.isDefined).map(_.get).map(x => {

val keyExtractMap: KeyExtractor[MetricTypes.EnrichedKeyType] = new EnrichedEventExtractor()
val eventExtractMap: KeyExtractor[MetricTypes.EventKeyType] = new TopicIdExtractor()

(eventExtractMap.getKey(x), keyExtractMap.getKey(x))
})
val stateSpec = StateSpec.function(trackStateFunc _).timeout(Minutes(2))
// val requestsWithState = pKeyDlines.mapWithState(stateSpec)
val requestsWithState = tempLines.mapWithState(stateSpec)
requestsWithState.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
val connection = createNewConnection()
partitionOfRecords.foreach(record => { record match {
case (accountId, enrichedId, ets, attributeMap) =>

if (validateRecordForStorage(accountId, enrichedId, ets, attributeMap)) {
val ds = new DynamoDBDataStore(connection)
ds.saveEnrichedEvent(accountId, enrichedId, ets, attributeMap)
//val r = scala.util.Random

} else {

/*logError("Discarded record [enrichedId=" + enrichedId
+ ", accountId=" + accountId
+ ", ets=" + ets
+ ", attributes=" + attributeMap.toString() + "]")*/
println("Discarded record [enrichedId=" + enrichedId
+ ", accountId=" + accountId
+ ", ets=" + ets
+ "]")

null
}

case default => {
logInfo("You gave me: " + default)
null
}
}

}

)

}
}
private def createNewConnection():AmazonDynamoDBClient = {
val amazonAWSAccessKey = "abcd"
val amazonAWSSecretKey = "1234"
val amazonDynamoDBEndpoint = "http:endpoint:9090"
val client = new AmazonDynamoDBClient(new BasicAWSCredentials(amazonAWSAccessKey, amazonAWSSecretKey))
client.setEndpoint(amazonDynamoDBEndpoint)
return client;
}

this is how much time each task is taking we are consuming @10,000/sec and writing 65000/8.4 min ...would greatly appreciate if someone could please point out why our job is so slow it would of great help

foreachpartition.PNG

foreachpartitonBatch.PNG

Don't have an account?
Coming from Hortonworks? Activate your account here