package org.sabre import java.util.HashMap import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.streaming.{StreamingContext, Duration} import org.apache.spark.SparkContext; import org.apache.spark.streaming.kafka._ import scala.collection.mutable.ArrayBuffer import org.apache.spark.sql.SaveMode import kafka.message.CompressionCodec import org.apache.spark.streaming.Seconds import org.apache.spark.sql.Row object DataIngestion { def main(args: Array[String]) { if (args.length < 4) { System.err.println("Usage: KafkaWordCount ") System.exit(1) } val Array(zkQuorum, group, topics, numThreads) = args val sparkConf = new SparkConf().setAppName("DataIngestion") val sc = new SparkContext(sparkConf) val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext.implicits._ val ssc = new StreamingContext(sc, Seconds(1)) val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap val msgs = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2) msgs.print() //msgs.saveAsTextFiles("/data/airsell_segment/landing/pss/") //creating pss-timestamp directorys under //under /data/airsell_segment/landing for each interval /*var arr = new ArrayBuffer[String](); msgs.foreachRDD { arr ++= _.collect() } val myRDD = sc.parallelize(arr) myRDD.saveAsTextFile("/data/airsell_segment/landing/pss")*/ //files are coming in single dir but empty files no data msgs.foreachRDD { rdd => println("Converting Dstreams into Dataframes") import sqlContext.implicits._ rdd.saveAsTextFile("/data/airsell_segment/landing/pss-data1") //rdd.repartition(1).saveAsTextFile("/data/airsell_segment/landing/pss-data1") //val dataFrame1 = rdd.toDF("message") //val dataFrame1 = rdd.toDF("transactiondate","transactiontime","sessionid","agencypcc","homepcc","actioncd","messageowner","pnrlocatorid","pnrcreatedate","nbrofseatsrequested","segmentnbr","segmentnbringrp","marketingcarrier","marketingflightnbr","servicestartdate","servicestarttime","serviceenddate","serviceendtime","travelerorigcity","travelerdestcity","bookingclass","statuscd","groupnbr","availabilitysource","soldas","operatingcarrier","errorcd") //dataFrame1.show(); //dataFrame.save("/data/airsell_segment/landing/pss", SaveMode.Append) //dataFrame1.write.mode(SaveMode.Append).parquet("/data/airsell_segment/landing/pss-data") //dataFrame.save("/data/airsell_segment/landing/pss", org.apache., SaveMode.Append) //dataFrame.write.format("com.databricks.spark.csv").mode(SaveMode.Append).save("/data/airsell_segment/landing/pss") //dataFrame1.write.option("compression", "none").option("quoteMode", "NONE").mode("append").save("/data/airsell_segment/landing/pss-data") } ssc.start() ssc.awaitTermination() } }