Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Spark: Output operation after mapWithSate

Spark: Output operation after mapWithSate

New Contributor

We have spark streaming application where we consume events from Kafka ..we want to aggregate the event's over a period of time by the traceid in each event and create an aggregate event for that traceid and write the aggreagated event into a database

our events are like this

<code>traceid: 123
{
  info: abc;
}

traceid: 123
{
  info:bcd;
}

Now what we want to achieve is create an aggregate event over a period of time say 2mins and write the aggregated event into database instead of individual event's

<code>traceid: 123
{
   info:abc,bcd
}

we used mapwithState and came up with this code

<code>val stateSpec = StateSpec.function(trackStateFunc _).timeout(Minutes(2)).
val requestsWithState = tempLines.mapWithState(stateSpec)
requestsWithState.foreachRDD { rdd =>
    rdd.foreachPartition { partitionOfRecords =>
      val connection = createNewConnection()
      partitionOfRecords.foreach(record => { record match {
    case (accountId, enrichedId, ets, attributeMap) =>

      if (validateRecordForStorage(accountId, enrichedId, ets, attributeMap)) {
        val ds = new DBDataStore(connection)
        ds.saveEnrichedEvent(accountId, enrichedId, ets, attributeMap)
        //val r = scala.util.Random

      } else {

        /*logError("Discarded record [enrichedId=" + enrichedId
            + ", accountId=" + accountId
            + ", ets=" + ets
            + ", attributes=" + attributeMap.toString() + "]")*/
        println("Discarded record [enrichedId=" + enrichedId
          + ", accountId=" + accountId
          + ", ets=" + ets
          + "]")

        null
      }

    case default => {
      logInfo("You gave me: " + default)
      null
    }
  }

  }

  )

    }
}

the mapwithState aggregates fine ...but our understanding is ..it should start writing to the database only after 2 min's but ..noticing that it start's writing immediately to the database without waiting for 2min's .....so definitely our understanding is not right if someone can please guide us in achieving our goal of writing to database only after aggregating for 2mins would greatly help