Support Questions

Find answers, ask questions, and share your expertise

[structured-streaming] foreachPartition alternative in structured streaming.

New Contributor

I am reading data from Kafka using structured streaming and I need to save the data to InfluxDB. In the regular Dstreams based approach I did this as follows:

val messages:DStream[(String, String)] = => 
    (record.topic, record.value)) 
    messages.foreachRDD { rdd => 
      rdd.foreachPartition { partitionOfRecords => 
        val influxService = new InfluxService() 
        val connection = influxService.createInfluxDBConnectionWithParams( 
        partitionOfRecords.foreach(record => { 
          ABCService.handleData(connection, record._1, record._2) 
    ssc.start()"Started Spark-Kafka streaming session") 

Note: I create connection object inside foreachpartition. How do I do this in Structured Streaming ? I tried connection pooling approach (where I create a pool of connections on the master node and pass it to worker nodes) here and the workers could not get the connection pool object. Anything obvious that I am missing here ?


Cloudera Employee
 datasetOfString.writeStream.foreach(new ForeachWriter[String] {
      def open(partitionId: Long, version: Long): Boolean = {
        // open connection
      def process(record: String) = {
        // write string to connection
      def close(errorOrNull: Throwable): Unit = {
        // close the connection

From the docs of ForeachWriter

Each task will get a fresh serialized-deserialized copy of the provided object

So whatever you initialize outside the ForeachWriter will run only at the driver.

You need to initialize the connection pool and open the connection in the open method.