03-09-2017 11:52 PM
We have a spark streaming application receiving data from kafka parse it and save it to database using the code below
val lines: DStream[MetricTypes.InputStreamType] = myConsumer.createDefaultStream() val keyDeLines = lines.filter(x => ((null != x) && !(x.equals(null) && (x._1 != null) && (x._2 != null) ))).map(x => { val lmeMap: RawMetricsExtractor = new JsonExtractor[HttpEvent]() try lmeMap.aParser(x) catch { case ase: Exception => logError("Error parsing item + "+ new GsonBuilder().setPrettyPrinting().create.toJson(x._2) + "]", ase) None } //lmeMap.aParser(x) }) val tempLines = keyDeLines.filter(x => ((null != x) && !(x.equals(null)))).filter(_.isDefined).map(_.get).map(x => { val keyExtractMap: KeyExtractor[MetricTypes.EnrichedKeyType] = new EnrichedEventExtractor() val eventExtractMap: KeyExtractor[MetricTypes.EventKeyType] = new TopicIdExtractor() (eventExtractMap.getKey(x), keyExtractMap.getKey(x)) }) val stateSpec = StateSpec.function(trackStateFunc _).timeout(Minutes(2)) // val requestsWithState = pKeyDlines.mapWithState(stateSpec) val requestsWithState = tempLines.mapWithState(stateSpec) requestsWithState.foreachRDD { rdd => rdd.foreachPartition { partitionOfRecords => val connection = createNewConnection() partitionOfRecords.foreach(record => { record match { case (accountId, enrichedId, ets, attributeMap) => if (validateRecordForStorage(accountId, enrichedId, ets, attributeMap)) { val ds = new DynamoDBDataStore(connection) ds.saveEnrichedEvent(accountId, enrichedId, ets, attributeMap) //val r = scala.util.Random } else { /*logError("Discarded record [enrichedId=" + enrichedId + ", accountId=" + accountId + ", ets=" + ets + ", attributes=" + attributeMap.toString() + "]")*/ println("Discarded record [enrichedId=" + enrichedId + ", accountId=" + accountId + ", ets=" + ets + "]") null } case default => { logInfo("You gave me: " + default) null } } } ) } } private def createNewConnection():AmazonDynamoDBClient = { val amazonAWSAccessKey = "abcd" val amazonAWSSecretKey = "1234" val amazonDynamoDBEndpoint = "http:endpoint:9090" val client = new AmazonDynamoDBClient(new BasicAWSCredentials(amazonAWSAccessKey, amazonAWSSecretKey)) client.setEndpoint(amazonDynamoDBEndpoint) return client; }
this is how much time each task is taking we are consuming @10,000/sec and writing 65000/8.4 min ...would greatly appreciate if someone could please point out why our job is so slow it would of great help