Member since
05-08-2019
5
Posts
0
Kudos Received
0
Solutions
05-17-2019
09:53 AM
Could you please give me in details ... or any sample property files can help me?
... View more
05-15-2019
11:15 AM
Am new to Nifi and want to know, can i populate data from HBase Table to kafka Topics I can find GetHBase Processor - In that i want to know how Hbase client service setup should be and how the out of hbase table's data to kafka. My ultimate goal is to stream the data from HBase ---> kafka ----> Spark Streaming . I thought to use Nifi for Hbase to Kafka?
... View more
Labels:
- Labels:
-
Apache HBase
-
Apache NiFi
-
Apache Spark
05-11-2019
09:48 PM
@dbompart - Tons of Thanks ... That Issue got resolved .... But getting new issues given below. I have checked the spark-default.conf but couldn't find any relevance. 19/05/11 13:03:36 ERROR JobScheduler: Error running job streaming job 1557560015000 ms.2
org.apache.spark.sql.AnalysisException: Path does not exist: hdfs://cch1wpsteris01:8020/user/root/rdd;
at org.apache.spark.sql.execution.datasources.DataSource$.org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary(DataSource.scala:719)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$15.apply(DataSource.scala:390)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$15.apply(DataSource.scala:390)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.immutable.List.flatMap(List.scala:344)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:389)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:239)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:227)
at org.apache.spark.sql.DataFrameReader.text(DataFrameReader.scala:693)
at org.apache.spark.sql.DataFrameReader.text(DataFrameReader.scala:666)
at iotloganalytics.KafkaCons$$anonfun$main$1.apply(KafkaCons.scala:83)
at iotloganalytics.KafkaCons$$anonfun$main$1.apply(KafkaCons.scala:75)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Exception in thread "main" org.apache.spark.sql.AnalysisException: Path does not exist: hdfs://cch1wpsteris01:8020/user/root/rdd;
at org.apache.spark.sql.execution.datasources.DataSource$.org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary(DataSource.scala:719)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$15.apply(DataSource.scala:390)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$15.apply(DataSource.scala:390)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.immutable.List.flatMap(List.scala:344)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:389)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:239)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:227)
at org.apache.spark.sql.DataFrameReader.text(DataFrameReader.scala:693)
at org.apache.spark.sql.DataFrameReader.text(DataFrameReader.scala:666)
at iotloganalytics.KafkaCons$$anonfun$main$1.apply(KafkaCons.scala:83)
at iotloganalytics.KafkaCons$$anonfun$main$1.apply(KafkaCons.scala:75)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
... View more
05-08-2019
11:33 PM
Am new to Hortonworks and trying to setup a Spark to kafka connectivity using scala which is given below as spark consumer. The below given jar built it from scala IDE, and using spark-submit i tried to run the jar from ../spark2/bin path. Help/Guide me to fix this issue. HDP 3.1.0.0 Getting error as below - java.io.FileNotFoundException: File does not exist: hdfs:/spark2-history [root@wpst01 bin]# ./spark-submit --class org.com.st.com.st.Iot_kafka_Consumer --master local[*] /usr/local/src/softwares/com.st-0.0.1-SNAPSHOT-jar-with-dependencies.jar
19/05/08 18:18:15 INFO SparkContext: Running Spark version 2.3.2.3.1.0.0-78
19/05/08 18:18:15 INFO SparkContext: Submitted application: kafkalab
19/05/08 18:18:15 INFO SecurityManager: Changing view acls to: root
19/05/08 18:18:15 INFO SecurityManager: Changing modify acls to: root
19/05/08 18:18:15 INFO SecurityManager: Changing view acls groups to:
19/05/08 18:18:15 INFO SecurityManager: Changing modify acls groups to:
19/05/08 18:18:15 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); groups with view permissions: Set(); users with modify permissions: Set(root); groups with modify permissions: Set()
19/05/08 18:18:15 INFO Utils: Successfully started service 'sparkDriver' on port 34204.
19/05/08 18:18:15 INFO SparkEnv: Registering MapOutputTracker
19/05/08 18:18:15 INFO SparkEnv: Registering BlockManagerMaster
19/05/08 18:18:15 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
19/05/08 18:18:15 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
19/05/08 18:18:15 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-12c9c574-f248-4ec0-9517-205d073614d3
19/05/08 18:18:15 INFO MemoryStore: MemoryStore started with capacity 366.3 MB
19/05/08 18:18:15 INFO SparkEnv: Registering OutputCommitCoordinator
19/05/08 18:18:15 INFO log: Logging initialized @2147ms
19/05/08 18:18:15 INFO Server: jetty-9.3.z-SNAPSHOT, build timestamp: 2018-06-05T22:41:56+05:30, git hash: 84205aa28f11a4f31f2a3b86d1bba2cc8ab69827
19/05/08 18:18:15 INFO Server: Started @2241ms
19/05/08 18:18:15 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
19/05/08 18:18:15 INFO AbstractConnector: Started ServerConnector@52350abb{HTTP/1.1,[http/1.1]}{0.0.0.0:4041}
19/05/08 18:18:15 INFO Utils: Successfully started service 'SparkUI' on port 4041.
19/05/08 18:18:15 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@5dda6f9{/jobs,null,AVAILABLE,@Spark}
19/05/08 18:18:15 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@287f94b1{/jobs/json,null,AVAILABLE,@Spark}
19/05/08 18:18:15 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@30b34287{/jobs/job,null,AVAILABLE,@Spark}
19/05/08 18:18:15 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@62f87c44{/jobs/job/json,null,AVAILABLE,@Spark}
19/05/08 18:18:15 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@48f5bde6{/stages,null,AVAILABLE,@Spark}
19/05/08 18:18:15 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@525d79f0{/stages/json,null,AVAILABLE,@Spark}
19/05/08 18:18:15 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@5149f008{/stages/stage,null,AVAILABLE,@Spark}
19/05/08 18:18:15 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@2ca65ce4{/stages/stage/json,null,AVAILABLE,@Spark}
19/05/08 18:18:15 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@327120c8{/stages/pool,null,AVAILABLE,@Spark}
19/05/08 18:18:15 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@5707c1cb{/stages/pool/json,null,AVAILABLE,@Spark}
19/05/08 18:18:15 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@2b5cb9b2{/storage,null,AVAILABLE,@Spark}
19/05/08 18:18:15 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@35038141{/storage/json,null,AVAILABLE,@Spark}
19/05/08 18:18:15 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@ecf9049{/storage/rdd,null,AVAILABLE,@Spark}
19/05/08 18:18:15 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@672f11c2{/storage/rdd/json,null,AVAILABLE,@Spark}
19/05/08 18:18:15 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@2970a5bc{/environment,null,AVAILABLE,@Spark}
19/05/08 18:18:15 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@50305a{/environment/json,null,AVAILABLE,@Spark}
19/05/08 18:18:15 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@72efb5c1{/executors,null,AVAILABLE,@Spark}
19/05/08 18:18:15 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@6d511b5f{/executors/json,null,AVAILABLE,@Spark}
19/05/08 18:18:15 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@41200e0c{/executors/threadDump,null,AVAILABLE,@Spark}
19/05/08 18:18:15 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@40f33492{/executors/threadDump/json,null,AVAILABLE,@Spark}
19/05/08 18:18:15 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@4fbdc0f0{/static,null,AVAILABLE,@Spark}
19/05/08 18:18:15 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@40bffbca{/,null,AVAILABLE,@Spark}
19/05/08 18:18:15 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@2449cff7{/api,null,AVAILABLE,@Spark}
19/05/08 18:18:15 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@37d80fe7{/jobs/job/kill,null,AVAILABLE,@Spark}
19/05/08 18:18:15 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@384fc774{/stages/stage/kill,null,AVAILABLE,@Spark}
19/05/08 18:18:15 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://wpst01:4041
19/05/08 18:18:16 INFO SparkContext: Added JAR file:/usr/local/src/softwares/com.steris-0.0.1-SNAPSHOT-jar-with-dependencies.jar at spark://wpst01:34204/jars/com.st-0.0.1-SNAPSHOT-jar-with-dependencies.jar with timestamp 1557319696003
19/05/08 18:18:16 INFO Executor: Starting executor ID driver on host localhost
19/05/08 18:18:16 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 43644.
19/05/08 18:18:16 INFO NettyBlockTransferService: Server created on wpst01:43644
19/05/08 18:18:16 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
19/05/08 18:18:16 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, wpst01, 43644, None)
19/05/08 18:18:16 INFO BlockManagerMasterEndpoint: Registering block manager wpst01:43644 with 366.3 MB RAM, BlockManagerId(driver, wpst01, 43644, None)
19/05/08 18:18:16 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, wpst01, 43644, None)
19/05/08 18:18:16 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, wpst01, 43644, None)
19/05/08 18:18:16 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@47b2e9e1{/metrics/json,null,AVAILABLE,@Spark}
19/05/08 18:18:16 INFO deprecation: No unit for dfs.client.datanode-restart.timeout(30) assuming SECONDS
19/05/08 18:18:17 ERROR SparkContext: Error initializing SparkContext.
java.io.FileNotFoundException: File does not exist: hdfs:/spark2-history
at org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1587)
at org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1580)
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1595)
at org.apache.spark.scheduler.EventLoggingListener.start(EventLoggingListener.scala:100)
at org.apache.spark.SparkContext.<init>(SparkContext.scala:522)
at org.com.steris.com.steris.Iot_kafka_Consumer$.main(Iot_kafka_Consumer.scala:34)
at org.com.steris.com.steris.Iot_kafka_Consumer.main(Iot_kafka_Consumer.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:904)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:198)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:228)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
19/05/08 18:18:17 INFO AbstractConnector: Stopped Spark@52350abb{HTTP/1.1,[http/1.1]}{0.0.0.0:4041}
19/05/08 18:18:17 INFO SparkUI: Stopped Spark web UI at http://wpst01:4041
19/05/08 18:18:17 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
19/05/08 18:18:17 INFO MemoryStore: MemoryStore cleared
19/05/08 18:18:17 INFO BlockManager: BlockManager stopped
19/05/08 18:18:17 INFO BlockManagerMaster: BlockManagerMaster stopped
19/05/08 18:18:17 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
19/05/08 18:18:17 INFO SparkContext: Successfully stopped SparkContext
Exception in thread "main" java.io.FileNotFoundException: File does not exist: hdfs:/spark2-history
at org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1587)
at org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1580)
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1595)
at org.apache.spark.scheduler.EventLoggingListener.start(EventLoggingListener.scala:100)
at org.apache.spark.SparkContext.<init>(SparkContext.scala:522)
at org.com.st.com.st.Iot_kafka_Consumer$.main(Iot_kafka_Consumer.scala:34)
at org.com.st.com.st.Iot_kafka_Consumer.main(Iot_kafka_Consumer.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:904)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:198)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:228)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
19/05/08 18:18:17 INFO ShutdownHookManager: Shutdown hook called
19/05/08 18:18:17 INFO ShutdownHookManager: Deleting directory /tmp/spark-b5f4edb6-cbcb-48d1-b741-76a83cf380d7
19/05/08 18:18:17 INFO ShutdownHookManager: Deleting directory /tmp/spark-f2c592ca-4d7f-4342-9745-01b045baae47 Spark-Submit command ./spark-submit --class org.com.st.com.st.Iot_kafka_Consumer --master local[*] /usr/local/src/softwares/com.st-0.0.1-SNAPSHOT-jar-with-dependencies.jar My Kafka Consumer scala Code package iotloganalytics import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.streaming.{Seconds, StreamingContext}
import StreamingContext._
//import org.json4s.native.JsonFormats.parse
import java.util.Properties
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
//import com.datastax.spark.connector._
//import com.datastax.bdp.spark.writer.BulkTableWriter._
//import com.datastax.spark.connector._
//import com.datastax.spark.connector.streaming._
import org.apache.spark.streaming.dstream.ConstantInputDStream
import org.apache.spark.sql.functions.explode
import org.apache.spark.sql.Row
import org.apache.spark.sql
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
object KafkaCons {
def main(args:Array[String])
{
val sparkConf = new SparkConf()
.setAppName("IotAnalytics")
.setMaster("local[*]")
.set("spark.sql.crossJoin.enabled", "true")
val sparkcontext = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sparkcontext)
import sqlContext.implicits._
sparkcontext.setLogLevel("ERROR")
val ssc = new StreamingContext(sparkcontext, Seconds(5))
ssc.checkpoint("file:///tmp/checkpointdir")
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "@172.25.122.140:6667",
//"bootstrap.servers" -> "35.209.4.97:9092",
//"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "STE",
"auto.offset.reset" -> "earliest"
)
val topics = Array("STE-DF-OR")
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
val kafkastream = stream.map(record => (record.key, record.value))
kafkastream.print()
println("Before kafka2")
val inputStream = kafkastream.map(rec => rec._2);
inputStream.print
println("Before kafka3")
inputStream.foreachRDD(rdd=>
{
//val jsonrdd = rdd.filter(_.contains("results"))
if(!rdd.isEmpty)
{
rdd.foreach(println)
val df = sqlContext.read.text("rdd")
df.show
}
})
ssc.start()
ssc.awaitTermination()
}
}
... View more
Labels:
- Labels:
-
Apache Kafka
-
Apache Spark