Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

CDH 5.4.7 spark-streaming(1.3.0) kafka message into hive, too many staging directory not cleaned

SOLVED Go to solution
Highlighted

CDH 5.4.7 spark-streaming(1.3.0) kafka message into hive, too many staging directory not cleaned

This seems a bug of spark, see https://github.com/apache/spark/pull/16325

 

my code is here,it's work.But after a while,there are a lot of .hive-stagingXXXX directory.

 

a photo of my warehouse as attachment.hive-staging.png

 

val sparkConf = new SparkConf()
.setAppName("SendHistoryConsumer")
// .setMaster("local[2]") // for debug
val ssc = new StreamingContext(sparkConf, Seconds(batchInterval.toInt))
ssc.checkpoint("/opt/cloudera/checkPoint")

// kafka configuration
val topicsSet = Set(KafkaConfigConstant.PRIORITY_10)
val kafkaParams = Map[String, String](
"metadata.broker.list" -> brokers,
"serializer.class"->"kafka.serializer.StringEncoder",
"group.id" -> groupId,
"auto.offset.reset" -> "smallest"

val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams,
topicsSet)
//messages.print()
val dStream = messages
.map(_._2)
.filter(message =>
!message.isEmpty
)
.map(parseTopic10)

dStream.foreachRDD(rdd =>

if (!rdd.isEmpty && !rdd.partitions.isEmpty) {

val hiveCtx = new HiveContext(rdd.sparkContext)
val sendCallBackPoolDF = hiveCtx.createDataFrame(rdd, classOf[SmsSendHistDataSede])

hiveCtx.sql("use db")
sendCallBackPoolDF.registerTempTable("sendCallBack")

hiveCtx.sql("set hive.exec.dynamic.partition=true")
hiveCtx.sql("set hive.exec.dynamic.partition.mode=nonstrict")

val smTrgPart = hiveCtx.sql("insert into table SmsSendHistData_orc partition" +
"(partittime) select userid,city,mobile,msgcount,msgbody,sender,accepttype,sendcode," +
"msgid,bmsgid,usermsgid,userbulkmsgid,channelmsgid,channelid,fileid,notifyurl,smscode ,smsstat," +
"notifycallnum,sendsmsbody,parentmsgid,errormessage,queuename,startdeliveryid,acctime,sendtime," +
"notifytime,smstype,from_unixtime(floor(acctime/1000),'yyyyMMdd') as partittime from sendCallBack")
smTrgPart.saveAsTable("SmsSendHistData_orc", SaveMode.Append)

}

 

1 ACCEPTED SOLUTION

Accepted Solutions

Re: CDH 5.4.7 spark-streaming(1.3.0) kafka message into hive, too many staging directory not cleane

Champion
Yeah, that is expected behavior. Each batch writes to the staging directory and when it is done, the data is moved to the actual table/partition directory. I have experienced these same staging directories being left behind. In general, if the data is successfully moved then there will be no data left behind. I ended up having a separate process that would check for entries since the last run (regular Spark jobs, not streaming) and then check if the directory was empty; remove, and repeat. I also employed these directories checks to see if something had go wrong in a job as the data would remain.
5 REPLIES 5

Re: CDH 5.4.7 spark-streaming(1.3.0) kafka message into hive, too many staging directory not cleane

It seems like , every batch of streaming create a .hive-staging directory. Any suggestions ??

Re: CDH 5.4.7 spark-streaming(1.3.0) kafka message into hive, too many staging directory not cleane

Champion
Yeah, that is expected behavior. Each batch writes to the staging directory and when it is done, the data is moved to the actual table/partition directory. I have experienced these same staging directories being left behind. In general, if the data is successfully moved then there will be no data left behind. I ended up having a separate process that would check for entries since the last run (regular Spark jobs, not streaming) and then check if the directory was empty; remove, and repeat. I also employed these directories checks to see if something had go wrong in a job as the data would remain.

Re: CDH 5.4.7 spark-streaming(1.3.0) kafka message into hive, too many staging directory not cleane

@mbigelow Some of these directories only have a _success file which is a result of batch. In spark-streaming scene, should I periodly remove the empty directory since they're too much?

 

Could you explain your solution in more detail, I'm a stupid guy.

Re: CDH 5.4.7 spark-streaming(1.3.0) kafka message into hive, too many staging directory not cleane

@mbigelow my active partition's data also split into small files too, It's really strangesmallfiles.png.

Re: CDH 5.4.7 spark-streaming(1.3.0) kafka message into hive, too many staging directory not cleane

Add two background thread, one for delete empty dir, another for hive Concatenate. But It's really a ugly way.