Member since
06-07-2016
6
Posts
1
Kudos Received
0
Solutions
06-08-2016
07:08 PM
Unfortunately, not. I should do it with all technology I have. I am using Zeppelin and I have elasticsearch and spark interpreter. Is there any way to do that? It would be really great if I can find the solution.
... View more
06-08-2016
06:44 PM
In the streaming context, there are messages from kafka which follow one of the following pattern : {"item":"bed","source":"Central","@version":"1","@timestamp":"2015-06-08T13:39:53.40","host":"...","headers":{....}}
{"message":"","tags":["_jsonparsefailure"],"@version":"1","@version":"1","@timestamp":"2015-06-08T13:39:53.40","host":"...","headers":{....}}
And this is the code :
import org.apache.spark.streaming._
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka._
import _root_.kafka.serializer.StringDecoder
import org.elasticsearch.spark._
import org.elasticsearch.spark.rdd.EsSpark
import org.apache.spark.streaming.kafka.KafkaUtils
val ssc = new StreamingContext(sc, Seconds(10))
val topics = Set("...")
val kafkaParams = Map[String, String]("bootstrap.servers" -> "...")
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics).map(_._2)
I want to save those messages which contain "item" and "source" in format of json in an index in elasticsearch. what can be the proper solution?
P.S: I have tried the following one but it didn't work
messages.foreachRDD({RDD =>
EsSpark.saveJsonToEs(RDD, "Test/Type")
})
... View more
Labels:
- Labels:
-
Apache Kafka
-
Apache Spark