<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>question how to save streaming context in elasticsearch? in Archives of Support Questions (Read Only)</title>
    <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/how-to-save-streaming-context-in-elasticsearch/m-p/128698#M31264</link>
    <description>&lt;P&gt;In the streaming context, there are messages from kafka which follow one of the following pattern : &lt;/P&gt;&lt;PRE&gt;{"item":"bed","source":"Central","@version":"1","@timestamp":"2015-06-08T13:39:53.40","host":"...","headers":{....}}
&lt;/PRE&gt;
&lt;PRE&gt;{"message":"","tags":["_jsonparsefailure"],"@version":"1","@version":"1","@timestamp":"2015-06-08T13:39:53.40","host":"...","headers":{....}}&lt;/PRE&gt;
&lt;PRE&gt;And this is the code :
import org.apache.spark.streaming._
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka._
import _root_.kafka.serializer.StringDecoder
import org.elasticsearch.spark._ 
import org.elasticsearch.spark.rdd.EsSpark  
import org.apache.spark.streaming.kafka.KafkaUtils
val ssc = new StreamingContext(sc, Seconds(10))
val topics = Set("...")
val kafkaParams = Map[String, String]("bootstrap.servers" -&amp;gt; "...")
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder,    StringDecoder](ssc, kafkaParams, topics).map(_._2)

I want to save those messages which contain "item" and "source" in format of json in an index in elasticsearch. what can be the proper solution?

P.S: I have tried the following one but it didn't work

messages.foreachRDD({RDD =&amp;gt;
EsSpark.saveJsonToEs(RDD, "Test/Type")
})


&lt;/PRE&gt;</description>
    <pubDate>Thu, 09 Jun 2016 01:44:23 GMT</pubDate>
    <dc:creator>saba87soltani</dc:creator>
    <dc:date>2016-06-09T01:44:23Z</dc:date>
    <item>
      <title>how to save streaming context in elasticsearch?</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/how-to-save-streaming-context-in-elasticsearch/m-p/128698#M31264</link>
      <description>&lt;P&gt;In the streaming context, there are messages from kafka which follow one of the following pattern : &lt;/P&gt;&lt;PRE&gt;{"item":"bed","source":"Central","@version":"1","@timestamp":"2015-06-08T13:39:53.40","host":"...","headers":{....}}
&lt;/PRE&gt;
&lt;PRE&gt;{"message":"","tags":["_jsonparsefailure"],"@version":"1","@version":"1","@timestamp":"2015-06-08T13:39:53.40","host":"...","headers":{....}}&lt;/PRE&gt;
&lt;PRE&gt;And this is the code :
import org.apache.spark.streaming._
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka._
import _root_.kafka.serializer.StringDecoder
import org.elasticsearch.spark._ 
import org.elasticsearch.spark.rdd.EsSpark  
import org.apache.spark.streaming.kafka.KafkaUtils
val ssc = new StreamingContext(sc, Seconds(10))
val topics = Set("...")
val kafkaParams = Map[String, String]("bootstrap.servers" -&amp;gt; "...")
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder,    StringDecoder](ssc, kafkaParams, topics).map(_._2)

I want to save those messages which contain "item" and "source" in format of json in an index in elasticsearch. what can be the proper solution?

P.S: I have tried the following one but it didn't work

messages.foreachRDD({RDD =&amp;gt;
EsSpark.saveJsonToEs(RDD, "Test/Type")
})


&lt;/PRE&gt;</description>
      <pubDate>Thu, 09 Jun 2016 01:44:23 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/how-to-save-streaming-context-in-elasticsearch/m-p/128698#M31264</guid>
      <dc:creator>saba87soltani</dc:creator>
      <dc:date>2016-06-09T01:44:23Z</dc:date>
    </item>
    <item>
      <title>Re: how to save streaming context in elasticsearch?</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/how-to-save-streaming-context-in-elasticsearch/m-p/128699#M31265</link>
      <description>&lt;P&gt;If you're open to technology alternatives - consider NiFi. It will allow you to visually connect your Kafka source, filter the data containing the json path values you need and post them all to the ElasticSearch. All without writing a single line of code.&lt;/P&gt;</description>
      <pubDate>Thu, 09 Jun 2016 02:05:14 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/how-to-save-streaming-context-in-elasticsearch/m-p/128699#M31265</guid>
      <dc:creator>andrewg</dc:creator>
      <dc:date>2016-06-09T02:05:14Z</dc:date>
    </item>
    <item>
      <title>Re: how to save streaming context in elasticsearch?</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/how-to-save-streaming-context-in-elasticsearch/m-p/128700#M31266</link>
      <description>&lt;P&gt;Unfortunately, not. I should do it with all technology I have. I am using Zeppelin and I have elasticsearch and spark  interpreter. Is there any way to do that? It would be really great if I can find the solution.&lt;/P&gt;</description>
      <pubDate>Thu, 09 Jun 2016 02:08:50 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/how-to-save-streaming-context-in-elasticsearch/m-p/128700#M31266</guid>
      <dc:creator>saba87soltani</dc:creator>
      <dc:date>2016-06-09T02:08:50Z</dc:date>
    </item>
    <item>
      <title>Re: how to save streaming context in elasticsearch?</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/how-to-save-streaming-context-in-elasticsearch/m-p/128701#M31267</link>
      <description>&lt;P&gt;elasticsearch and Spark&lt;/P&gt;&lt;P&gt;&lt;A href="https://www.elastic.co/guide/en/elasticsearch/hadoop/master/spark.html" target="_blank"&gt;https://www.elastic.co/guide/en/elasticsearch/hadoop/master/spark.html&lt;/A&gt;&lt;/P&gt;&lt;P&gt;I would recommend land your data in HDFS and you can point ES or SOLR at it&lt;/P&gt;</description>
      <pubDate>Fri, 10 Jun 2016 22:30:36 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/how-to-save-streaming-context-in-elasticsearch/m-p/128701#M31267</guid>
      <dc:creator>TimothySpann</dc:creator>
      <dc:date>2016-06-10T22:30:36Z</dc:date>
    </item>
  </channel>
</rss>

