Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Spark : How to make calls to database using foreachPartition

Highlighted

Spark : How to make calls to database using foreachPartition

Explorer

We have spark streaming job ..writing data to AmazonDynamoDB using foreachRDD but it is very slow with our consumption rate at 10,000/sec and writing 10,000 takes 35min ...this is the code piece

From research learnt that using foreachpartition and creating a connection per partition will help ..but not sure how to go about writing code for it ..will greatly appreciate if someone can help with this ...Also any other suggestion to speed up writing is greatly appreciated

tempRequestsWithState.foreachRDD { rdd =>


    if ((rdd != null) && (rdd.count() > 0) && (!rdd.isEmpty()) ) {


      rdd.foreachPartition {


        case (topicsTableName, hashKeyTemp, attributeValueUpdate) => {
          
          val client = new AmazonDynamoDBClient()
          val request = new UpdateItemRequest(topicsTableName, hashKeyTemp, attributeValueUpdate)
          try client.updateItem(request)




        catch {


            case se: Exception => println("Error executing updateItem!\nTable ", se)


        }


        }


        case null =>




      }
    }
  }
2 REPLIES 2

Re: Spark : How to make calls to database using foreachPartition

@Aditya Mamidala

Here's a working example of foreachPartition that I've used as part of a project. This is part of a Spark Streaming process, where "event" is a DStream, and each stream is written to HBase via Phoenix (JDBC). I have a structure similar to what you tried in your code, where I first use foreachRDD then foreachPartition.

 event.map(x => x._2 ).foreachRDD { rdd =>
    rdd.foreachPartition { rddpartition =>
        val thinUrl = "jdbc:phoenix:phoenix.dev:2181:/hbase"
        val conn = DriverManager.getConnection(thinUrl)
        rddpartition.foreach { record =>
            conn.createStatement().execute("UPSERT INTO myTable VALUES (" + record._1 + ")" )
        }
        conn.commit()
    }
}

The full project is located here.

Highlighted

Re: Spark : How to make calls to database using foreachPartition

Explorer

Thanks for your answer @Dan Zaratsian ..i implemented a solution similar to what you suggested but still facing performance issue could you please take a look at https://community.hortonworks.com/questions/85745/spark-writing-data-to-amazondynamodb-is-slow.html

Don't have an account?
Coming from Hortonworks? Activate your account here