Created 06-08-2016 06:44 PM
In the streaming context, there are messages from kafka which follow one of the following pattern :
{"item":"bed","source":"Central","@version":"1","@timestamp":"2015-06-08T13:39:53.40","host":"...","headers":{....}}
{"message":"","tags":["_jsonparsefailure"],"@version":"1","@version":"1","@timestamp":"2015-06-08T13:39:53.40","host":"...","headers":{....}}
And this is the code :
import org.apache.spark.streaming._
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka._
import _root_.kafka.serializer.StringDecoder
import org.elasticsearch.spark._
import org.elasticsearch.spark.rdd.EsSpark
import org.apache.spark.streaming.kafka.KafkaUtils
val ssc = new StreamingContext(sc, Seconds(10))
val topics = Set("...")
val kafkaParams = Map[String, String]("bootstrap.servers" -> "...")
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics).map(_._2)
I want to save those messages which contain "item" and "source" in format of json in an index in elasticsearch. what can be the proper solution?
P.S: I have tried the following one but it didn't work
messages.foreachRDD({RDD =>
EsSpark.saveJsonToEs(RDD, "Test/Type")
})
Created 06-10-2016 03:30 PM
elasticsearch and Spark
https://www.elastic.co/guide/en/elasticsearch/hadoop/master/spark.html
I would recommend land your data in HDFS and you can point ES or SOLR at it
Created 06-08-2016 07:05 PM
If you're open to technology alternatives - consider NiFi. It will allow you to visually connect your Kafka source, filter the data containing the json path values you need and post them all to the ElasticSearch. All without writing a single line of code.
Created 06-08-2016 07:08 PM
Unfortunately, not. I should do it with all technology I have. I am using Zeppelin and I have elasticsearch and spark interpreter. Is there any way to do that? It would be really great if I can find the solution.
Created 06-10-2016 03:30 PM
elasticsearch and Spark
https://www.elastic.co/guide/en/elasticsearch/hadoop/master/spark.html
I would recommend land your data in HDFS and you can point ES or SOLR at it