package ******* import org.apache.spark.streaming.StreamingContext import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.Seconds import kafka.serializer.StringDecoder import org.apache.spark.serializer.KryoSerializer import org.apache.spark.storage.StorageLevel object ****** { def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName("*********").setMaster("yarn-client").set("spark.driver.allowMultipleContexts", "true").set("spark.speculation", "false") sparkConf.set("spark.cores.max", "2") sparkConf.set("spark.serializer", classOf[KryoSerializer].getName) sparkConf.set("spark.sql.tungsten.enabled", "true") sparkConf.set("spark.eventLog.enabled", "true") sparkConf.set("spark.app.id", "**********") sparkConf.set("spark.io.compression.codec", "snappy") sparkConf.set("spark.rdd.compress", "true") sparkConf.set("spark.streaming.backpressure.enabled", "true") sparkConf.set("spark.sql.avro.compression.codec", "snappy") sparkConf.set("spark.sql.avro.mergeSchema", "true") sparkConf.set("spark.sql.avro.binaryAsString", "true") sparkConf.set("spark.eventLog.enabled", "true") val sc = new SparkContext(sparkConf) // sc.hadoopConfiguration.set("avro.enable.summary-metadata", "false") //sc.addFile("/opt/mapr/hadoop/hadoop-2.7.0/etc/hadoop/core-site.xml") //sc.addFile("/opt/mapr/hadoop/hadoop-2.7.0/etc/hadoop/hdfs-site.xml") val ssc = new StreamingContext(sc, Seconds(300)) val kafkaConf = Map[String, String]("metadata.broker.list" -> "**********************", "zookeeper.connect" -> "************************", "group.id" -> "*********", "zookeeper.connection.timeout.ms" -> "100000") val topicMaps = Map("ugadi1" -> 2) val messages = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaConf, topicMaps, StorageLevel.MEMORY_ONLY_SER) messages.foreachRDD(rdd => { val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext.implicits._ val dataframe = sqlContext.read.json(rdd.map(_._2)) val myDF = dataframe.toDF() myDF.show() import org.apache.spark.sql.SaveMode myDF.write.format("parquet").mode(org.apache.spark.sql.SaveMode.Append).save("/Twitter_Handle") // dataframe.write.mode(SaveMode.Append).parquet("/parquet/test.parquet") }) ssc.start() ssc.awaitTermination() ssc.stop(stopSparkContext = true, stopGracefully = true) } }