Member since
08-15-2016
22
Posts
0
Kudos Received
1
Solution
My Accepted Solutions
Title | Views | Posted |
---|---|---|
2499 | 08-25-2016 11:12 PM |
02-14-2017
01:29 AM
--conf "spark.driver.extraClassPath=/opt/cloudera/parcels/CDH-5.4.7-1.cdh5.4.7.p0.3/jars/htrace-core-3.1.0-incubating.jar:/opt/cloudera/parcels/CDH-5.4.7-1.cdh5.4.7.p0.3/lib/hive/conf:/opt/cloudera/parcels/CDH-5.4.7-1.cdh5.4.7.p0.3/lib/hive/lib/*.jar" \ --conf "spark.executor.extraClassPath=/opt/cloudera/parcels/CDH-5.4.7-1.cdh5.4.7.p0.3/jars/htrace-core-3.1.0-incubating.jar:/opt/cloudera/parcels/CDH-5.4.7-1.cdh5.4.7.p0.3/lib/hive/conf:/opt/cloudera/parcels/CDH-5.4.7-1.cdh5.4.7.p0.3/lib/hive/lib/*.jar" \ --conf "spark.driver.extraJavaOptions=-XX:MaxPermSize=1024m -XX:PermSize=256m" \ --conf "spark.executor.extraJavaOptions=-XX:MaxPermSize=1024m -XX:PermSize=256m" \ work for me
... View more
02-13-2017
07:14 PM
I just boot a demon thread scheduleAtFixRate clean these "empty" and has file "_SUCCESS" directory and another thread to run hive cmd "alter xxx concatenate" Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(new Runnable { override def run(): Unit = { val fs = FileSystem.get(new Configuration()) val status = fs.listStatus(new Path(s"hdfs://nameservice/user/xxx/warehouse/$tableName/")) status.foreach(stat => if (stat.isDirectory && stat.getPath.getName.contains("hive-staging") && fs.getContentSummary (stat.getPath).getSpaceConsumed < 1024) { println("empty path : " + stat.getPath) if (directoryHasSuccess(stat.getPath, fs)) { fs.delete(stat.getPath, true) } val now = new Date().getTime if (now - stat.getModificationTime > 5 * 60 * 1000 && (now - stat.getAccessTime > 5 * 60 * 1000)) { //5m before println("delete path " + stat.getPath) fs.delete(stat.getPath, true) } } ) } }, 5, interval, TimeUnit.SECONDS);
... View more
01-17-2017
07:45 PM
Add two background thread, one for delete empty dir, another for hive Concatenate. But It's really a ugly way.
... View more
01-16-2017
11:25 PM
@mbigelow my active partition's data also split into small files too, It's really strange .
... View more
01-16-2017
11:21 PM
@mbigelow Some of these directories only have a _success file which is a result of batch. In spark-streaming scene, should I periodly remove the empty directory since they're too much? Could you explain your solution in more detail, I'm a stupid guy.
... View more
01-16-2017
10:54 PM
It seems like , every batch of streaming create a .hive-staging directory. Any suggestions ??
... View more
01-16-2017
10:53 PM
This seems a bug of spark, see https://github.com/apache/spark/pull/16325 my code is here,it's work.But after a while,there are a lot of .hive-stagingXXXX directory. a photo of my warehouse as attachment. val sparkConf = new SparkConf() .setAppName("SendHistoryConsumer") // .setMaster("local[2]") // for debug val ssc = new StreamingContext(sparkConf, Seconds(batchInterval.toInt)) ssc.checkpoint("/opt/cloudera/checkPoint") // kafka configuration val topicsSet = Set(KafkaConfigConstant.PRIORITY_10) val kafkaParams = Map[String, String]( "metadata.broker.list" -> brokers, "serializer.class"->"kafka.serializer.StringEncoder", "group.id" -> groupId, "auto.offset.reset" -> "smallest" val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet) //messages.print() val dStream = messages .map(_._2) .filter(message => !message.isEmpty ) .map(parseTopic10) dStream.foreachRDD(rdd => if (!rdd.isEmpty && !rdd.partitions.isEmpty) { val hiveCtx = new HiveContext(rdd.sparkContext) val sendCallBackPoolDF = hiveCtx.createDataFrame(rdd, classOf[SmsSendHistDataSede]) hiveCtx.sql("use db") sendCallBackPoolDF.registerTempTable("sendCallBack") hiveCtx.sql("set hive.exec.dynamic.partition=true") hiveCtx.sql("set hive.exec.dynamic.partition.mode=nonstrict") val smTrgPart = hiveCtx.sql("insert into table SmsSendHistData_orc partition" + "(partittime) select userid,city,mobile,msgcount,msgbody,sender,accepttype,sendcode," + "msgid,bmsgid,usermsgid,userbulkmsgid,channelmsgid,channelid,fileid,notifyurl,smscode ,smsstat," + "notifycallnum,sendsmsbody,parentmsgid,errormessage,queuename,startdeliveryid,acctime,sendtime," + "notifytime,smstype,from_unixtime(floor(acctime/1000),'yyyyMMdd') as partittime from sendCallBack") smTrgPart.saveAsTable("SmsSendHistData_orc", SaveMode.Append) }
... View more
Labels:
- Labels:
-
Apache Hive
-
Apache Kafka
-
Apache Spark
01-15-2017
11:30 PM
Have the same problem,CDH 5.4.7.After streaming job with HiveContext
... View more
08-25-2016
11:12 PM
I have resolve this.It's weird.I just pause the download and retry.It's get work!
... View more
08-25-2016
08:23 AM
I have put the parcels in my local httpd server,and I have see that CM can get it.But when do install parcels,CM block in 0%. any ideas? Is there some log I can trace this problem??
... View more
Labels:
- Labels:
-
Cloudera Manager