Created 02-22-2017 06:24 AM
We have a spark streaming application where we receive a dstream from kafka and need to store to dynamoDB ....i'm experimenting with two ways to do it as described in the code below
requestsWithState is a Dstream
Code Snippet 1 with foreachRDD:
<code>requestsWithState.foreachRDD { rdd => println("Data being populated to Pulsar") rdd.foreach { case (id, eventStream) => println("id is " + id + " Event is " + eventStream) val dynamoConnection = setupDynamoClientConnection() DBUtils.putItem(dynamoConnection, id, eventStream.toString()) } }
Code Snippet 2 with map:
<code> requestsWithState.map (rdd => { rdd match { case (id, eventStream) => { println("id is " + id + " Event is " + eventStream) val dynamoConnection = setupDynamoClientConnection() DBUtils.putItem(dynamoConnection, id, eventStream.toString()) } } }) requestsWithState.print(1)
Code Snippet1 work's fine and populates the database...the second code snippet doesn't work ....we would love to know the reason behind it and the way we can make it work ........the reason we are experimenting ( we know it's a transformation and foreachRdd is an action) is foreachRdd is very slow for our use case with heavy load on a cluster and we found that map is much faster if we can get it working.....please help us get map code working
Created 02-23-2017 08:01 AM
What exception/error happens in code 2? Just curious.
foreachRDD is the prescriptive method to write to external systems. So you should be using foreachRDD. The outer loop executes on the driver and inner loop on the executors. Executors run on remote machines in a cluster. However in the code above its not clear how dynamoConnection is available to executors since such network connections are usually not serializable.
Or is the following line inadvertently missing from snippet 1.
val dynamoConnection = setupDynamoClientConnection()
If yes, then the slowness could stem from repeatedly creating a dynamoClientConnection for each record. The recommended pattern is to use foreachPartition() to create the connection once per partition and then rdd.foreach() to write the records using that connection.
For more info please search for foreachPartition see http://spark.apache.org/docs/latest/streaming-programming-guide.html
Created 02-23-2017 08:01 AM
What exception/error happens in code 2? Just curious.
foreachRDD is the prescriptive method to write to external systems. So you should be using foreachRDD. The outer loop executes on the driver and inner loop on the executors. Executors run on remote machines in a cluster. However in the code above its not clear how dynamoConnection is available to executors since such network connections are usually not serializable.
Or is the following line inadvertently missing from snippet 1.
val dynamoConnection = setupDynamoClientConnection()
If yes, then the slowness could stem from repeatedly creating a dynamoClientConnection for each record. The recommended pattern is to use foreachPartition() to create the connection once per partition and then rdd.foreach() to write the records using that connection.
For more info please search for foreachPartition see http://spark.apache.org/docs/latest/streaming-programming-guide.html
Created 02-24-2017 01:34 AM
@Bikas thanks a lot for your suggestion ! the link you provided doesn't seem to be working could you please provide some sample code of how to write foreachpartition as i'm not familiar with it
Created 03-01-2017 08:31 AM
Fixed the link. It has the code.