Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Who agreed with this topic

How to write data from dStream into permanent Hive table

avatar

Hello,

  I tried to make a simple application in Spark Streaming which reads every 5s new data from HDFS and simply inserts into a Hive table. On the official Spark web site I have found an example, how to perform SQL operations on DStream data, via foreachRDD function, but the catch is, that the example used sqlContext and transformed the data from RDD to DataFrame.  The problem is, that with this DF, the data cannot be saved (appended) to an existing permanent Hive table. HiveContext has to be created.

 

So I tried this program, it works, but fails after a while, because runs out of memory, because it creates every time a new HiveContext object.

 

I tried to create the HiveContext BEFORE the map, and broadcast it, but it failed.

I tried to call getOrCreate, which works fine with sqlContext but not with hiveContext.

 

Any ideas?

Thanks

 

Tomas

 

Snímka.PNG

 

 

 

val sparkConf = new SparkConf().setAppName("StreamHDFSdata")
sparkConf.set("spark.dynamicAllocation.enabled","false")
val ssc = new StreamingContext(sparkConf, Seconds(5))
ssc.checkpoint("/user/hdpuser/checkpoint")
val sc = ssc.sparkContext


val smDStream = ssc.textFileStream("/user/hdpuser/data")
val smSplitted = smDStream.map( x => x.split(";") ).map( x => Row.fromSeq( x ) )
val smStruct = StructType( (0 to 10).toList.map( x => "col"+x.toString).map( y => StructField( y , StringType, true ) ) )

//val hiveCx = new org.apache.spark.sql.hive.HiveContext(sc)
//val sqlBc = sc.broadcast( hiveCx )

smSplitted.foreachRDD( rdd => {
//val sqlContext = SQLContext.getOrCreate(rdd.sparkContext) --> sqlContext cannot be used for permanent table create
val sqlContext = new org.apache.spark.sql.hive.HiveContext(rdd.sparkContext)
//val sqlContext = sqlBc.value --> THIS DOES NOT WORK: fail during runtime
//val sqlContext = new HiveContext.getOrCreate(rdd.sparkContext) --> THIS DOES NOT WORK EITHER: fail during runtime

//import hiveCx.implicits._
val smDF = sqlContext.createDataFrame( rdd, smStruct )
//val smDF = rdd.toDF
smDF.registerTempTable("sm")
val smTrgPart = sqlContext.sql("insert into table onlinetblsm select * from sm")
smTrgPart.write.mode(SaveMode.Append).saveAsTable("onlinetblsm")
} )

 

Who agreed with this topic