Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Spark map vs foreachRdd

avatar
Rising Star

We have a spark streaming application where we receive a dstream from kafka and need to store to dynamoDB ....i'm experimenting with two ways to do it as described in the code below

requestsWithState is a Dstream

Code Snippet 1 with foreachRDD:

<code>requestsWithState.foreachRDD { rdd =>
   println("Data being populated to Pulsar")
   rdd.foreach { case (id, eventStream) =>
     println("id is " + id + " Event is " + eventStream)
     val dynamoConnection = setupDynamoClientConnection()
     DBUtils.putItem(dynamoConnection, id, eventStream.toString())
   }
}

Code Snippet 2 with map:

<code>  requestsWithState.map (rdd => { rdd match {
     case (id, eventStream) => {
       println("id is " + id + " Event is " + eventStream)
       val dynamoConnection = setupDynamoClientConnection()
       DBUtils.putItem(dynamoConnection, id, eventStream.toString())
     }
   }
 })

requestsWithState.print(1)


Code Snippet1 work's fine and populates the database...the second code snippet doesn't work ....we would love to know the reason behind it and the way we can make it work ........the reason we are experimenting ( we know it's a transformation and foreachRdd is an action) is foreachRdd is very slow for our use case with heavy load on a cluster and we found that map is much faster if we can get it working.....please help us get map code working

1 ACCEPTED SOLUTION

avatar
Super Collaborator

What exception/error happens in code 2? Just curious.

foreachRDD is the prescriptive method to write to external systems. So you should be using foreachRDD. The outer loop executes on the driver and inner loop on the executors. Executors run on remote machines in a cluster. However in the code above its not clear how dynamoConnection is available to executors since such network connections are usually not serializable.

Or is the following line inadvertently missing from snippet 1.

val dynamoConnection = setupDynamoClientConnection()

If yes, then the slowness could stem from repeatedly creating a dynamoClientConnection for each record. The recommended pattern is to use foreachPartition() to create the connection once per partition and then rdd.foreach() to write the records using that connection.

For more info please search for foreachPartition see http://spark.apache.org/docs/latest/streaming-programming-guide.html

View solution in original post

3 REPLIES 3

avatar
Super Collaborator

What exception/error happens in code 2? Just curious.

foreachRDD is the prescriptive method to write to external systems. So you should be using foreachRDD. The outer loop executes on the driver and inner loop on the executors. Executors run on remote machines in a cluster. However in the code above its not clear how dynamoConnection is available to executors since such network connections are usually not serializable.

Or is the following line inadvertently missing from snippet 1.

val dynamoConnection = setupDynamoClientConnection()

If yes, then the slowness could stem from repeatedly creating a dynamoClientConnection for each record. The recommended pattern is to use foreachPartition() to create the connection once per partition and then rdd.foreach() to write the records using that connection.

For more info please search for foreachPartition see http://spark.apache.org/docs/latest/streaming-programming-guide.html

avatar
Rising Star

@Bikas thanks a lot for your suggestion ! the link you provided doesn't seem to be working could you please provide some sample code of how to write foreachpartition as i'm not familiar with it

avatar
Super Collaborator

Fixed the link. It has the code.