Reply
Explorer
Posts: 25
Registered: ‎01-10-2017

Spark : Need help identifying the reason for slowness

We have a spark streaming application receiving data from kafka parse it and save it to database using the code below

 

val lines: DStream[MetricTypes.InputStreamType] = myConsumer.createDefaultStream()



val keyDeLines = lines.filter(x => ((null != x) && !(x.equals(null) && (x._1 != null) && (x._2 != null) ))).map(x => {

val lmeMap: RawMetricsExtractor = new JsonExtractor[HttpEvent]()
try lmeMap.aParser(x)
catch {

case ase: Exception =>

logError("Error parsing item + "+ new GsonBuilder().setPrettyPrinting().create.toJson(x._2) + "]", ase)
None

}
//lmeMap.aParser(x)

})


val tempLines = keyDeLines.filter(x => ((null != x) && !(x.equals(null)))).filter(_.isDefined).map(_.get).map(x => {

val keyExtractMap: KeyExtractor[MetricTypes.EnrichedKeyType] = new EnrichedEventExtractor()
val eventExtractMap: KeyExtractor[MetricTypes.EventKeyType] = new TopicIdExtractor()

(eventExtractMap.getKey(x), keyExtractMap.getKey(x))
})


val stateSpec = StateSpec.function(trackStateFunc _).timeout(Minutes(2))
// val requestsWithState = pKeyDlines.mapWithState(stateSpec)
val requestsWithState = tempLines.mapWithState(stateSpec)



requestsWithState.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
val connection = createNewConnection()
partitionOfRecords.foreach(record => { record match {


case (accountId, enrichedId, ets, attributeMap) =>

if (validateRecordForStorage(accountId, enrichedId, ets, attributeMap)) {
val ds = new DynamoDBDataStore(connection)
ds.saveEnrichedEvent(accountId, enrichedId, ets, attributeMap)
//val r = scala.util.Random

} else {

/*logError("Discarded record [enrichedId=" + enrichedId
+ ", accountId=" + accountId
+ ", ets=" + ets
+ ", attributes=" + attributeMap.toString() + "]")*/
println("Discarded record [enrichedId=" + enrichedId
+ ", accountId=" + accountId
+ ", ets=" + ets
+ "]")

null
}

case default => {
logInfo("You gave me: " + default)
null
}
}

}

)

}
}


private def createNewConnection():AmazonDynamoDBClient = {
val amazonAWSAccessKey = "abcd"
val amazonAWSSecretKey = "1234"
val amazonDynamoDBEndpoint = "http:endpoint:9090"
val client = new AmazonDynamoDBClient(new BasicAWSCredentials(amazonAWSAccessKey, amazonAWSSecretKey))
client.setEndpoint(amazonDynamoDBEndpoint)
return client;
}


this is how much time each task is taking we are consuming @10,000/sec and writing 65000/8.4 min ...would greatly appreciate if someone could please point out why our job is so slow it would of great help

 

foreachpartition.PNG

 

foreachpartitonBatch.PNG