Created on 01-16-2017 10:53 PM - edited 09-16-2022 03:55 AM
This seems a bug of spark, see https://github.com/apache/spark/pull/16325
my code is here,it's work.But after a while,there are a lot of .hive-stagingXXXX directory.
a photo of my warehouse as attachment.
val sparkConf = new SparkConf()
.setAppName("SendHistoryConsumer")
// .setMaster("local[2]") // for debug
val ssc = new StreamingContext(sparkConf, Seconds(batchInterval.toInt))
ssc.checkpoint("/opt/cloudera/checkPoint")
// kafka configuration
val topicsSet = Set(KafkaConfigConstant.PRIORITY_10)
val kafkaParams = Map[String, String](
"metadata.broker.list" -> brokers,
"serializer.class"->"kafka.serializer.StringEncoder",
"group.id" -> groupId,
"auto.offset.reset" -> "smallest"
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams,
topicsSet)
//messages.print()
val dStream = messages
.map(_._2)
.filter(message =>
!message.isEmpty
)
.map(parseTopic10)
dStream.foreachRDD(rdd =>
if (!rdd.isEmpty && !rdd.partitions.isEmpty) {
val hiveCtx = new HiveContext(rdd.sparkContext)
val sendCallBackPoolDF = hiveCtx.createDataFrame(rdd, classOf[SmsSendHistDataSede])
hiveCtx.sql("use db")
sendCallBackPoolDF.registerTempTable("sendCallBack")
hiveCtx.sql("set hive.exec.dynamic.partition=true")
hiveCtx.sql("set hive.exec.dynamic.partition.mode=nonstrict")
val smTrgPart = hiveCtx.sql("insert into table SmsSendHistData_orc partition" +
"(partittime) select userid,city,mobile,msgcount,msgbody,sender,accepttype,sendcode," +
"msgid,bmsgid,usermsgid,userbulkmsgid,channelmsgid,channelid,fileid,notifyurl,smscode ,smsstat," +
"notifycallnum,sendsmsbody,parentmsgid,errormessage,queuename,startdeliveryid,acctime,sendtime," +
"notifytime,smstype,from_unixtime(floor(acctime/1000),'yyyyMMdd') as partittime from sendCallBack")
smTrgPart.saveAsTable("SmsSendHistData_orc", SaveMode.Append)
}
Created 01-16-2017 11:08 PM
Created 01-16-2017 10:54 PM
It seems like , every batch of streaming create a .hive-staging directory. Any suggestions ??
Created 01-16-2017 11:08 PM
Created on 01-16-2017 11:21 PM - edited 01-16-2017 11:51 PM
@mbigelow Some of these directories only have a _success file which is a result of batch. In spark-streaming scene, should I periodly remove the empty directory since they're too much?
Could you explain your solution in more detail, I'm a stupid guy.
Created on 01-16-2017 11:25 PM - edited 01-16-2017 11:26 PM
@mbigelow my active partition's data also split into small files too, It's really strange.
Created 01-17-2017 07:45 PM