Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

[structured-streaming] foreachPartition alternative in structured streaming.

[structured-streaming] foreachPartition alternative in structured streaming.

New Contributor

I am reading data from Kafka using structured streaming and I need to save the data to InfluxDB. In the regular Dstreams based approach I did this as follows:

val messages:DStream[(String, String)] =  kafkaStream.map(record => 
    (record.topic, record.value)) 
    messages.foreachRDD { rdd => 
      rdd.foreachPartition { partitionOfRecords => 
        val influxService = new InfluxService() 
        val connection = influxService.createInfluxDBConnectionWithParams( 
            host, 
            port, 
            username, 
            password, 
            database 
            ) 
        partitionOfRecords.foreach(record => { 
          ABCService.handleData(connection, record._1, record._2) 
        } 
        ) 
      } 
    } 
    ssc.start() 
    logger.info("Started Spark-Kafka streaming session") 
    ssc.awaitTermination() 

Note: I create connection object inside foreachpartition. How do I do this in Structured Streaming ? I tried connection pooling approach (where I create a pool of connections on the master node and pass it to worker nodes) here and the workers could not get the connection pool object. Anything obvious that I am missing here ?

1 REPLY 1

Re: [structured-streaming] foreachPartition alternative in structured streaming.

New Contributor
 datasetOfString.writeStream.foreach(new ForeachWriter[String] {
      def open(partitionId: Long, version: Long): Boolean = {
        // open connection
      }
      def process(record: String) = {
        // write string to connection
      }
      def close(errorOrNull: Throwable): Unit = {
        // close the connection
      }
    })

From the docs of ForeachWriter

Each task will get a fresh serialized-deserialized copy of the provided object

So whatever you initialize outside the ForeachWriter will run only at the driver.

You need to initialize the connection pool and open the connection in the open method.

Don't have an account?
Coming from Hortonworks? Activate your account here