Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Spark map vs foreachRdd

Solved Go to solution

Spark map vs foreachRdd

New Contributor

We have a spark streaming application where we receive a dstream from kafka and need to store to dynamoDB ....i'm experimenting with two ways to do it as described in the code below

requestsWithState is a Dstream

Code Snippet 1 with foreachRDD:

<code>requestsWithState.foreachRDD { rdd =>
   println("Data being populated to Pulsar")
   rdd.foreach { case (id, eventStream) =>
     println("id is " + id + " Event is " + eventStream)
     val dynamoConnection = setupDynamoClientConnection()
     DBUtils.putItem(dynamoConnection, id, eventStream.toString())
   }
}

Code Snippet 2 with map:

<code>  requestsWithState.map (rdd => { rdd match {
     case (id, eventStream) => {
       println("id is " + id + " Event is " + eventStream)
       val dynamoConnection = setupDynamoClientConnection()
       DBUtils.putItem(dynamoConnection, id, eventStream.toString())
     }
   }
 })

requestsWithState.print(1)


Code Snippet1 work's fine and populates the database...the second code snippet doesn't work ....we would love to know the reason behind it and the way we can make it work ........the reason we are experimenting ( we know it's a transformation and foreachRdd is an action) is foreachRdd is very slow for our use case with heavy load on a cluster and we found that map is much faster if we can get it working.....please help us get map code working

1 ACCEPTED SOLUTION

Accepted Solutions

Re: Spark map vs foreachRdd

Expert Contributor

What exception/error happens in code 2? Just curious.

foreachRDD is the prescriptive method to write to external systems. So you should be using foreachRDD. The outer loop executes on the driver and inner loop on the executors. Executors run on remote machines in a cluster. However in the code above its not clear how dynamoConnection is available to executors since such network connections are usually not serializable.

Or is the following line inadvertently missing from snippet 1.

val dynamoConnection = setupDynamoClientConnection()

If yes, then the slowness could stem from repeatedly creating a dynamoClientConnection for each record. The recommended pattern is to use foreachPartition() to create the connection once per partition and then rdd.foreach() to write the records using that connection.

For more info please search for foreachPartition see http://spark.apache.org/docs/latest/streaming-programming-guide.html

3 REPLIES 3

Re: Spark map vs foreachRdd

Expert Contributor

What exception/error happens in code 2? Just curious.

foreachRDD is the prescriptive method to write to external systems. So you should be using foreachRDD. The outer loop executes on the driver and inner loop on the executors. Executors run on remote machines in a cluster. However in the code above its not clear how dynamoConnection is available to executors since such network connections are usually not serializable.

Or is the following line inadvertently missing from snippet 1.

val dynamoConnection = setupDynamoClientConnection()

If yes, then the slowness could stem from repeatedly creating a dynamoClientConnection for each record. The recommended pattern is to use foreachPartition() to create the connection once per partition and then rdd.foreach() to write the records using that connection.

For more info please search for foreachPartition see http://spark.apache.org/docs/latest/streaming-programming-guide.html

Re: Spark map vs foreachRdd

New Contributor

@Bikas thanks a lot for your suggestion ! the link you provided doesn't seem to be working could you please provide some sample code of how to write foreachpartition as i'm not familiar with it

Highlighted

Re: Spark map vs foreachRdd

Expert Contributor

Fixed the link. It has the code.

Don't have an account?
Coming from Hortonworks? Activate your account here