Support Questions

Find answers, ask questions, and share your expertise

Spark Streaming save output to mysql DB

avatar
Expert Contributor

i working on spark streaming context "word count example" , so is it possible to store the output RDD into MYSQL database using bulk insertion  in JDBC ? and if its possible is there any examples for it ? 

 

Thanks in advance 

2 ACCEPTED SOLUTIONS

avatar
Master Collaborator

Yes, perfectly possible. It's not specific to Spark Streaming or even Spark; you'd just use foreachPartition to create and execute a SQL statement via JDBC over a batch of records. The code is just normal JDBC code.

View solution in original post

avatar
Expert Contributor

i managed to insert RDD into mysql database ! thanks so much here's a sample code if anyone needs it :

 

val r = sc.makeRDD(1 to 4)

r2.foreachPartition {

it =>

    val conn= DriverManager.getConnection(url,username,password)

    val del = conn.prepareStatement ("INSERT INTO tweets (ID,Text) VALUES (?,?) ")

    for (bookTitle <-it)

     {

          del.setString(1,bookTitle.toString)

          del.setString(2,"my input")

          del.executeUpdate

    }

}

 

 

 

View solution in original post

7 REPLIES 7

avatar
Master Collaborator

Yes, perfectly possible. It's not specific to Spark Streaming or even Spark; you'd just use foreachPartition to create and execute a SQL statement via JDBC over a batch of records. The code is just normal JDBC code.

avatar
Expert Contributor

okey great ! thanks so much , but can you provide an example for it , if you please ? 

avatar
Expert Contributor

i managed to insert RDD into mysql database ! thanks so much here's a sample code if anyone needs it :

 

val r = sc.makeRDD(1 to 4)

r2.foreachPartition {

it =>

    val conn= DriverManager.getConnection(url,username,password)

    val del = conn.prepareStatement ("INSERT INTO tweets (ID,Text) VALUES (?,?) ")

    for (bookTitle <-it)

     {

          del.setString(1,bookTitle.toString)

          del.setString(2,"my input")

          del.executeUpdate

    }

}

 

 

 

avatar
New Contributor
How do i implement the same in pyspark ?

avatar
New Contributor

same query how to use writestream.jdbc in pyspark?

avatar
Expert Contributor

i am using apache spark to collect tweets using twitter4j , then want to save the data into mysql database ,
i created table in mysql DB with ID , createdat, source,text , location
and here's the code i used " i modified an twitter4j example "

 

import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.SparkContext._
import org.apache.spark.streaming.twitter._
import org.apache.spark.SparkConf


import org.apache.spark.SparkContext._
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.IntParam


import org.apache.spark.rdd.JdbcRDD
import java.sql.{Connection, DriverManager, ResultSet}

object TwitterPopularTags {
def main(args: Array[String]) {
val filters = args.takeRight(args.length)
System.setProperty("twitter4j.oauth.consumerKey", "H2XXXXXX" )
System.setProperty("twitter4j.oauth.consumerSecret", "WjOXXXXXXXXX" )
System.setProperty("twitter4j.oauth.accessToken", "22XXX")
System.setProperty("twitter4j.oauth.accessTokenSecret", "vRXXXXXX")
val url = "jdbc:mysql://192.168.4.45:3306/twitter"
val username = "root"
val password = "123456"
Class.forName("com.mysql.jdbc.Driver").newInstance
val sparkConf = new SparkConf().setAppName("TwitterPopularTags")
val ssc = new StreamingContext(sparkConf, Seconds(2))
val stream = TwitterUtils.createStream(ssc, None, filters)
println("CURRENT CONFIGURATION:"+System.getProperties().get("twitter4j.oauth.accessTokenSecret"))
val tweets_toprint = stream.map(tuple => "%s,%s,%s,%s,%s".format(tuple.getId, tuple.getCreatedAt,tuple.getSource, tuple.getText.toLowerCase.replaceAll(",", " "),tuple.getGeoLocation)).print
val tweets = stream.foreachRDD
{
rdd => rdd.foreachPartition {

it =>
val conn = DriverManager.getConnection(url,username,password)
val del = conn.prepareStatement("INSERT INTO tweets (ID,CreatedAt,Source,Text,GeoLocation) VALUES (?,?,?,?,?)")
for (tuple <- it) {
del.setLong (1, tuple.getId)
del.setString(2, tuple.getCreatedAt.toString)
del.setString(3, tuple.getSource)
del.setString(4, tuple.getText)
del.setString(5, tuple.getGeoLocation.toString)

del.executeUpdate
}
conn.close()
}
}
ssc.start()
ssc.awaitTermination()
}
}


i can submit the job normally on spark and it prints out some tweets according to my filter , but i can't write data into the DB and get a NULL pointer error when i receive a tweet, here it is :


15/03/18 13:18:17 WARN TaskSetManager: Lost task 0.0 in stage 7.0 (TID 82, node2.com): java.lang.NullPointerException
at TwitterPopularTags$$anonfun$2$$anonfun$apply$1$$anonfun$apply$2.apply(PopularHashTags.scala:55)
at TwitterPopularTags$$anonfun$2$$anonfun$apply$1$$anonfun$apply$2.apply(PopularHashTags.scala:50)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at org.apache.spark.util.NextIterator.foreach(NextIterator.scala:21)
at TwitterPopularTags$$anonfun$2$$anonfun$apply$1.apply(PopularHashTags.scala:50)
at TwitterPopularTags$$anonfun$2$$anonfun$apply$1.apply(PopularHashTags.scala:47)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:773)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:773)
at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314)
at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)


and get this error when no tweets received :

 

15/03/18 13:18:18 ERROR JobScheduler: Error running job streaming job 1426673896000 ms.1
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 7.0 failed 4 times, most recent failure: Lost task 0.3 in stage 7.0 (TID 85, node2.com): java.lang.NullPointerException
at TwitterPopularTags$$anonfun$2$$anonfun$apply$1$$anonfun$apply$2.apply(PopularHashTags.scala:55)
at TwitterPopularTags$$anonfun$2$$anonfun$apply$1$$anonfun$apply$2.apply(PopularHashTags.scala:50)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at org.apache.spark.util.NextIterator.foreach(NextIterator.scala:21)
at TwitterPopularTags$$anonfun$2$$anonfun$apply$1.apply(PopularHashTags.scala:50)
at TwitterPopularTags$$anonfun$2$$anonfun$apply$1.apply(PopularHashTags.scala:47)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:773)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:773)
at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314)
at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

 

i guess the 2nd error is normal to abort the job if no tweets received but i don't know what i am doing wrong to get the first error when tweet is recieved , i managed to normally insert an RDD into mysql DB normally in spark shell ,

thanks in advance

avatar
Master Collaborator

Looks good although I would recommend closing the statement and connection too.

Also, you're executing an update for every datum. JDBC as an addBatch / executeBatch interface too I think? might be faster.