Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Spark Streaming save output to mysql DB

Solved Go to solution

Spark Streaming save output to mysql DB

Expert Contributor

i working on spark streaming context "word count example" , so is it possible to store the output RDD into MYSQL database using bulk insertion  in JDBC ? and if its possible is there any examples for it ? 

 

Thanks in advance 

2 ACCEPTED SOLUTIONS

Accepted Solutions

Re: Spark Streaming save output to mysql DB

Master Collaborator

Yes, perfectly possible. It's not specific to Spark Streaming or even Spark; you'd just use foreachPartition to create and execute a SQL statement via JDBC over a batch of records. The code is just normal JDBC code.

Re: Spark Streaming save output to mysql DB

Expert Contributor

i managed to insert RDD into mysql database ! thanks so much here's a sample code if anyone needs it :

 

val r = sc.makeRDD(1 to 4)

r2.foreachPartition {

it =>

    val conn= DriverManager.getConnection(url,username,password)

    val del = conn.prepareStatement ("INSERT INTO tweets (ID,Text) VALUES (?,?) ")

    for (bookTitle <-it)

     {

          del.setString(1,bookTitle.toString)

          del.setString(2,"my input")

          del.executeUpdate

    }

}

 

 

 

6 REPLIES 6

Re: Spark Streaming save output to mysql DB

Master Collaborator

Yes, perfectly possible. It's not specific to Spark Streaming or even Spark; you'd just use foreachPartition to create and execute a SQL statement via JDBC over a batch of records. The code is just normal JDBC code.

Highlighted

Re: Spark Streaming save output to mysql DB

Expert Contributor

okey great ! thanks so much , but can you provide an example for it , if you please ? 

Re: Spark Streaming save output to mysql DB

Expert Contributor

i managed to insert RDD into mysql database ! thanks so much here's a sample code if anyone needs it :

 

val r = sc.makeRDD(1 to 4)

r2.foreachPartition {

it =>

    val conn= DriverManager.getConnection(url,username,password)

    val del = conn.prepareStatement ("INSERT INTO tweets (ID,Text) VALUES (?,?) ")

    for (bookTitle <-it)

     {

          del.setString(1,bookTitle.toString)

          del.setString(2,"my input")

          del.executeUpdate

    }

}

 

 

 

Re: Spark Streaming save output to mysql DB

New Contributor
How do i implement the same in pyspark ?

Re: Spark Streaming save output to mysql DB

Expert Contributor

i am using apache spark to collect tweets using twitter4j , then want to save the data into mysql database ,
i created table in mysql DB with ID , createdat, source,text , location
and here's the code i used " i modified an twitter4j example "

 

import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.SparkContext._
import org.apache.spark.streaming.twitter._
import org.apache.spark.SparkConf


import org.apache.spark.SparkContext._
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.IntParam


import org.apache.spark.rdd.JdbcRDD
import java.sql.{Connection, DriverManager, ResultSet}

object TwitterPopularTags {
def main(args: Array[String]) {
val filters = args.takeRight(args.length)
System.setProperty("twitter4j.oauth.consumerKey", "H2XXXXXX" )
System.setProperty("twitter4j.oauth.consumerSecret", "WjOXXXXXXXXX" )
System.setProperty("twitter4j.oauth.accessToken", "22XXX")
System.setProperty("twitter4j.oauth.accessTokenSecret", "vRXXXXXX")
val url = "jdbc:mysql://192.168.4.45:3306/twitter"
val username = "root"
val password = "123456"
Class.forName("com.mysql.jdbc.Driver").newInstance
val sparkConf = new SparkConf().setAppName("TwitterPopularTags")
val ssc = new StreamingContext(sparkConf, Seconds(2))
val stream = TwitterUtils.createStream(ssc, None, filters)
println("CURRENT CONFIGURATION:"+System.getProperties().get("twitter4j.oauth.accessTokenSecret"))
val tweets_toprint = stream.map(tuple => "%s,%s,%s,%s,%s".format(tuple.getId, tuple.getCreatedAt,tuple.getSource, tuple.getText.toLowerCase.replaceAll(",", " "),tuple.getGeoLocation)).print
val tweets = stream.foreachRDD
{
rdd => rdd.foreachPartition {

it =>
val conn = DriverManager.getConnection(url,username,password)
val del = conn.prepareStatement("INSERT INTO tweets (ID,CreatedAt,Source,Text,GeoLocation) VALUES (?,?,?,?,?)")
for (tuple <- it) {
del.setLong (1, tuple.getId)
del.setString(2, tuple.getCreatedAt.toString)
del.setString(3, tuple.getSource)
del.setString(4, tuple.getText)
del.setString(5, tuple.getGeoLocation.toString)

del.executeUpdate
}
conn.close()
}
}
ssc.start()
ssc.awaitTermination()
}
}


i can submit the job normally on spark and it prints out some tweets according to my filter , but i can't write data into the DB and get a NULL pointer error when i receive a tweet, here it is :


15/03/18 13:18:17 WARN TaskSetManager: Lost task 0.0 in stage 7.0 (TID 82, node2.com): java.lang.NullPointerException
at TwitterPopularTags$$anonfun$2$$anonfun$apply$1$$anonfun$apply$2.apply(PopularHashTags.scala:55)
at TwitterPopularTags$$anonfun$2$$anonfun$apply$1$$anonfun$apply$2.apply(PopularHashTags.scala:50)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at org.apache.spark.util.NextIterator.foreach(NextIterator.scala:21)
at TwitterPopularTags$$anonfun$2$$anonfun$apply$1.apply(PopularHashTags.scala:50)
at TwitterPopularTags$$anonfun$2$$anonfun$apply$1.apply(PopularHashTags.scala:47)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:773)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:773)
at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314)
at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)


and get this error when no tweets received :

 

15/03/18 13:18:18 ERROR JobScheduler: Error running job streaming job 1426673896000 ms.1
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 7.0 failed 4 times, most recent failure: Lost task 0.3 in stage 7.0 (TID 85, node2.com): java.lang.NullPointerException
at TwitterPopularTags$$anonfun$2$$anonfun$apply$1$$anonfun$apply$2.apply(PopularHashTags.scala:55)
at TwitterPopularTags$$anonfun$2$$anonfun$apply$1$$anonfun$apply$2.apply(PopularHashTags.scala:50)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at org.apache.spark.util.NextIterator.foreach(NextIterator.scala:21)
at TwitterPopularTags$$anonfun$2$$anonfun$apply$1.apply(PopularHashTags.scala:50)
at TwitterPopularTags$$anonfun$2$$anonfun$apply$1.apply(PopularHashTags.scala:47)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:773)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:773)
at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314)
at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

 

i guess the 2nd error is normal to abort the job if no tweets received but i don't know what i am doing wrong to get the first error when tweet is recieved , i managed to normally insert an RDD into mysql DB normally in spark shell ,

thanks in advance

Re: Spark Streaming save output to mysql DB

Master Collaborator

Looks good although I would recommend closing the statement and connection too.

Also, you're executing an update for every datum. JDBC as an addBatch / executeBatch interface too I think? might be faster.