Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

how to save streaming context in elasticsearch?

avatar

In the streaming context, there are messages from kafka which follow one of the following pattern :

{"item":"bed","source":"Central","@version":"1","@timestamp":"2015-06-08T13:39:53.40","host":"...","headers":{....}}
{"message":"","tags":["_jsonparsefailure"],"@version":"1","@version":"1","@timestamp":"2015-06-08T13:39:53.40","host":"...","headers":{....}}
And this is the code :
import org.apache.spark.streaming._
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka._
import _root_.kafka.serializer.StringDecoder
import org.elasticsearch.spark._ 
import org.elasticsearch.spark.rdd.EsSpark  
import org.apache.spark.streaming.kafka.KafkaUtils
val ssc = new StreamingContext(sc, Seconds(10))
val topics = Set("...")
val kafkaParams = Map[String, String]("bootstrap.servers" -> "...")
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder,    StringDecoder](ssc, kafkaParams, topics).map(_._2)

I want to save those messages which contain "item" and "source" in format of json in an index in elasticsearch. what can be the proper solution?

P.S: I have tried the following one but it didn't work

messages.foreachRDD({RDD =>
EsSpark.saveJsonToEs(RDD, "Test/Type")
})


1 ACCEPTED SOLUTION

avatar
Master Guru

elasticsearch and Spark

https://www.elastic.co/guide/en/elasticsearch/hadoop/master/spark.html

I would recommend land your data in HDFS and you can point ES or SOLR at it

View solution in original post

3 REPLIES 3

avatar

If you're open to technology alternatives - consider NiFi. It will allow you to visually connect your Kafka source, filter the data containing the json path values you need and post them all to the ElasticSearch. All without writing a single line of code.

avatar

Unfortunately, not. I should do it with all technology I have. I am using Zeppelin and I have elasticsearch and spark interpreter. Is there any way to do that? It would be really great if I can find the solution.

avatar
Master Guru

elasticsearch and Spark

https://www.elastic.co/guide/en/elasticsearch/hadoop/master/spark.html

I would recommend land your data in HDFS and you can point ES or SOLR at it