Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

CDH 5.4.7 spark-streaming(1.3.0) kafka message into hive, too many staging directory not cleaned

avatar

This seems a bug of spark, see https://github.com/apache/spark/pull/16325

 

my code is here,it's work.But after a while,there are a lot of .hive-stagingXXXX directory.

 

a photo of my warehouse as attachment.hive-staging.png

 

val sparkConf = new SparkConf()
.setAppName("SendHistoryConsumer")
// .setMaster("local[2]") // for debug
val ssc = new StreamingContext(sparkConf, Seconds(batchInterval.toInt))
ssc.checkpoint("/opt/cloudera/checkPoint")

// kafka configuration
val topicsSet = Set(KafkaConfigConstant.PRIORITY_10)
val kafkaParams = Map[String, String](
"metadata.broker.list" -> brokers,
"serializer.class"->"kafka.serializer.StringEncoder",
"group.id" -> groupId,
"auto.offset.reset" -> "smallest"

val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams,
topicsSet)
//messages.print()
val dStream = messages
.map(_._2)
.filter(message =>
!message.isEmpty
)
.map(parseTopic10)

dStream.foreachRDD(rdd =>

if (!rdd.isEmpty && !rdd.partitions.isEmpty) {

val hiveCtx = new HiveContext(rdd.sparkContext)
val sendCallBackPoolDF = hiveCtx.createDataFrame(rdd, classOf[SmsSendHistDataSede])

hiveCtx.sql("use db")
sendCallBackPoolDF.registerTempTable("sendCallBack")

hiveCtx.sql("set hive.exec.dynamic.partition=true")
hiveCtx.sql("set hive.exec.dynamic.partition.mode=nonstrict")

val smTrgPart = hiveCtx.sql("insert into table SmsSendHistData_orc partition" +
"(partittime) select userid,city,mobile,msgcount,msgbody,sender,accepttype,sendcode," +
"msgid,bmsgid,usermsgid,userbulkmsgid,channelmsgid,channelid,fileid,notifyurl,smscode ,smsstat," +
"notifycallnum,sendsmsbody,parentmsgid,errormessage,queuename,startdeliveryid,acctime,sendtime," +
"notifytime,smstype,from_unixtime(floor(acctime/1000),'yyyyMMdd') as partittime from sendCallBack")
smTrgPart.saveAsTable("SmsSendHistData_orc", SaveMode.Append)

}

 

1 ACCEPTED SOLUTION

avatar
Champion
Yeah, that is expected behavior. Each batch writes to the staging directory and when it is done, the data is moved to the actual table/partition directory. I have experienced these same staging directories being left behind. In general, if the data is successfully moved then there will be no data left behind. I ended up having a separate process that would check for entries since the last run (regular Spark jobs, not streaming) and then check if the directory was empty; remove, and repeat. I also employed these directories checks to see if something had go wrong in a job as the data would remain.

View solution in original post

5 REPLIES 5

avatar

It seems like , every batch of streaming create a .hive-staging directory. Any suggestions ??

avatar
Champion
Yeah, that is expected behavior. Each batch writes to the staging directory and when it is done, the data is moved to the actual table/partition directory. I have experienced these same staging directories being left behind. In general, if the data is successfully moved then there will be no data left behind. I ended up having a separate process that would check for entries since the last run (regular Spark jobs, not streaming) and then check if the directory was empty; remove, and repeat. I also employed these directories checks to see if something had go wrong in a job as the data would remain.

avatar

@mbigelow Some of these directories only have a _success file which is a result of batch. In spark-streaming scene, should I periodly remove the empty directory since they're too much?

 

Could you explain your solution in more detail, I'm a stupid guy.

avatar

@mbigelow my active partition's data also split into small files too, It's really strangesmallfiles.png.

avatar
Add two background thread, one for delete empty dir, another for hive Concatenate. But It's really a ugly way.