package com.informatica.exec import org.apache.spark._ import org.apache.spark.rdd._ import org.apache.spark.storage.StorageLevel._ import org.apache.spark.sql._ import org.apache.spark.sql.types._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.functions.{ broadcast => infabroadcast } import java.io._ import java.sql.Timestamp import scala.reflect.ClassTag import scala.collection.JavaConversions._ import org.apache.spark.sql.streaming._ import org.apache.spark.sql.streaming.Trigger._ object Spark0 { def main(s:Array[String]) { val sqlContext = SparkSession.builder().enableHiveSupport() .master("local[*]") .getOrCreate() import sqlContext.implicits._ import org.apache.spark.sql.functions.{stddev_samp, var_samp} val checkpointDir = "/tmp/1hjhk-wedd-wqedsd-qq2112-dssd101" val v1 = sqlContext.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:49092").option("subscribePattern", "source1").load().toDF(); val schema = StructType(List(StructField("id", IntegerType, true), StructField("name", StringType, true), StructField("age", IntegerType, true), StructField("timestamp", TimestampType, true))) val stream = v1.selectExpr("cast (value as string) as json") .select(from_json($"json", schema=schema) as "data") .select("data.*") stream .writeStream.format("com.hortonworks.spark.sql.hive.llap.streaming.HiveStreamingDataSource").option("metastoreUri", "thrift://:9083") .option("database", "default").option("table", "hive_target") .trigger(Trigger.ProcessingTime(20000L)) .option("checkpointLocation", checkpointDir + "/Write_hive_taget").option("queryName", "Write_hive_taget").start; sqlContext.streams.awaitAnyTermination() } }