<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>question Re: How to save all the output of spark sql query into a text file. in Archives of Support Questions (Read Only)</title>
    <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/How-to-save-all-the-output-of-spark-sql-query-into-a-text/m-p/133921#M23221</link>
    <description>&lt;P&gt;&lt;A rel="user" href="https://community.cloudera.com/users/2833/hodamoradi2014.html" nodeid="2833"&gt;@hoda moradi&lt;/A&gt;
&lt;/P&gt;&lt;P&gt;Unfortunately  not for Java but Scala the general difference is just that it changes the fields. The Extractor class is a Java or Scala class that changes an object from one object into another. For example you have columns, you could create a CSV parser that parses the file and returns a structured object containing all fields you need and do any transformations. I think I should do a quick article about that sometimes.&lt;/P&gt;&lt;PRE&gt;var parsedStream = inputStream.mapPartitions {
  records =&amp;gt;
  val extractor = new Extractor(field,regex); 
  records.map { 
    record =&amp;gt; extractor.parse(record) 
  } 
}&lt;/PRE&gt;</description>
    <pubDate>Tue, 22 Mar 2016 18:47:32 GMT</pubDate>
    <dc:creator>bleonhardi</dc:creator>
    <dc:date>2016-03-22T18:47:32Z</dc:date>
    <item>
      <title>How to save all the output of spark sql query into a text file.</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/How-to-save-all-the-output-of-spark-sql-query-into-a-text/m-p/133914#M23214</link>
      <description>&lt;P&gt;I am writing a simple consumer program using spark streaming. My code save some of the data in to the file but not ALL of the data. Can anyone help me how to fix this. I am not sure where I am losing the data.&lt;/P&gt;&lt;PRE&gt;import org.apache.spark.streaming.kafka.*;
import kafka.serializer.StringDecoder;
import java.io.*;
import java.io.BufferedOutputStream;
import java.io.DataOutputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import org.apache.spark.*;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.*;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import scala.Tuple2;
import org.apache.log4j.Logger;
import org.apache.log4j.Level;
public class ConsumerFile {
	public static void main(String[] args){
	Logger.getLogger("org").setLevel(Level.OFF);
        Logger.getLogger("akka").setLevel(Level.OFF);
	  String topic = args[0];
	 final String path=new String(args[2]);
	  String broker = args[1];
	    SparkConf sparkConf = new SparkConf().setAppName("SparkConsumer").setMaster("local[*]");;
	    JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(2));
	    HashSet&amp;lt;String&amp;gt; topicsSet = new HashSet&amp;lt;String&amp;gt;(Arrays.asList(topic.split(",")));

	    HashMap&amp;lt;String, String&amp;gt; kafkaParams = new HashMap&amp;lt;String, String&amp;gt;();

	    kafkaParams.put("metadata.broker.list", broker);
	    JavaPairInputDStream&amp;lt;String, String&amp;gt; kafkaStream = KafkaUtils.createDirectStream(
	    ssc, String.class, String.class,StringDecoder.class,StringDecoder.class,kafkaPrams,
topicsSet
);
    
    JavaDStream&amp;lt;String&amp;gt; words = kafkaStream.map(new Function&amp;lt;Tuple2&amp;lt;String, String&amp;gt;, String&amp;gt;() 

                                             {
                       public String call(Tuple2&amp;lt;String, String&amp;gt; message)

                                                 {
                                                     return message._2();}});
    words.foreachRDD(
    	      new Function2&amp;lt;JavaRDD&amp;lt;String&amp;gt;, Time, Void&amp;gt;() {
    	   public Void call(JavaRDD&amp;lt;String&amp;gt; rdd, Time time) {
	   SQLContext sqlContext = JavaSQLContextSingleton.getInstance(rdd.context());
    	          // Convert RDD[String] to RDD[case class] to DataFrame
    	          JavaRDD&amp;lt;JavaRow&amp;gt; rowRDD = rdd.map(new Function&amp;lt;String, JavaRow&amp;gt;() {
    	            public JavaRow call(String line) throws Exception{
  	            	String[] fields = line.split(",");
    	              JavaRow record = new JavaRow(fields[0], fields[1],fields[2], fields[3], f		ields[4], fields[5],Long.parseLong(fields[6].trim()), fields[7],fields[8],fields[9],
Long.parseLong(fields[10].trim()),Long.parseLong(fields[11].trim()),Long.parseLong(fields[12].trim()),Long.parseLong(fields[13].trim()),fields[14],fields[15],fields[16],fields[17],Long.parseLong(fields[18].trim()),fields[19],Long.parseLong(fields[20].trim()),Long.parseLong(fields[21].trim())  );

    	              return record;

    	            }

    	          });

    	          DataFrame wordsDataFrame = sqlContext.createDataFrame(rowRDD, JavaRow.class);
    	          wordsDataFrame.registerTempTable("Data");
    	          DataFrame wDataFrame = sqlContext.sql(" select * from Data");  
    	          if(!wDataFrame.rdd().isEmpty()){
    	    	 wDataFrame.rdd().coalesce(1,true,null).saveAsTextFile(path); }
    	          return null;
    	        }} );
    ssc.start();
     ssc.awaitTermination();}
}


&lt;/PRE&gt;</description>
      <pubDate>Fri, 18 Mar 2016 00:18:38 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/How-to-save-all-the-output-of-spark-sql-query-into-a-text/m-p/133914#M23214</guid>
      <dc:creator>hoda_moradi2014</dc:creator>
      <dc:date>2016-03-18T00:18:38Z</dc:date>
    </item>
    <item>
      <title>Re: How to save all the output of spark sql query into a text file.</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/How-to-save-all-the-output-of-spark-sql-query-into-a-text/m-p/133915#M23215</link>
      <description>&lt;P&gt;Hello Hoda,&lt;/P&gt;&lt;P&gt;so I think I know the problem. When you do foreachRDD it essentially executes your function on each RDD of the DStream  you save it all to the same file. So they overwrite each others data and the first or last writer wins.&lt;/P&gt;&lt;P&gt;There are savefunctions available on the DStream so you could just transform the data with mapPartition instead of foreachRDD and then save it with DStream.saveAsTextFile.&lt;/P&gt;&lt;P&gt;or the easiest way you save them in a file with a unique name. &lt;/P&gt;&lt;P&gt; wDataFrame.rdd().coalesce(1,true,null).saveAsTextFile(path + time.milliseconds.toString);}&lt;/P&gt;&lt;P&gt;I think the time variable comes already in automatically with foreachRDD but you might have to instantiate a current date before if not. Now this is not very elegant since you could have the same timestamp twice but that is actually how the spark streaming guys do it if you look into the DStream.saveAsTextFile method. You could make this even more unique by adding a random number that is large enough to never run into duplicates or find a way to get the executor id. I would prefer this if you find a way to get it I would be thankful :-).&lt;/P&gt;</description>
      <pubDate>Fri, 18 Mar 2016 03:50:55 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/How-to-save-all-the-output-of-spark-sql-query-into-a-text/m-p/133915#M23215</guid>
      <dc:creator>bleonhardi</dc:creator>
      <dc:date>2016-03-18T03:50:55Z</dc:date>
    </item>
    <item>
      <title>Re: How to save all the output of spark sql query into a text file.</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/How-to-save-all-the-output-of-spark-sql-query-into-a-text/m-p/133916#M23216</link>
      <description>&lt;P&gt;Thank you so much! you saved me a lot of time.&lt;/P&gt;</description>
      <pubDate>Fri, 18 Mar 2016 04:15:47 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/How-to-save-all-the-output-of-spark-sql-query-into-a-text/m-p/133916#M23216</guid>
      <dc:creator>hoda_moradi2014</dc:creator>
      <dc:date>2016-03-18T04:15:47Z</dc:date>
    </item>
    <item>
      <title>Re: How to save all the output of spark sql query into a text file.</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/How-to-save-all-the-output-of-spark-sql-query-into-a-text/m-p/133917#M23217</link>
      <description>&lt;P&gt;One more thing, it works fine but still it saves the part-0000 file(last or first RDD) as well. Is there anyway I can get rid of that?&lt;/P&gt;</description>
      <pubDate>Fri, 18 Mar 2016 04:22:55 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/How-to-save-all-the-output-of-spark-sql-query-into-a-text/m-p/133917#M23217</guid>
      <dc:creator>hoda_moradi2014</dc:creator>
      <dc:date>2016-03-18T04:22:55Z</dc:date>
    </item>
    <item>
      <title>Re: How to save all the output of spark sql query into a text file.</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/How-to-save-all-the-output-of-spark-sql-query-into-a-text/m-p/133918#M23218</link>
      <description>&lt;P&gt;hmmm good question, can you tell me how you start the program? I.e. what is in the path variable? You are sure that the part files are not simply old and still around from before? I do not see any other function in your code that would write the file. Is the content changing? &lt;/P&gt;</description>
      <pubDate>Fri, 18 Mar 2016 07:19:03 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/How-to-save-all-the-output-of-spark-sql-query-into-a-text/m-p/133918#M23218</guid>
      <dc:creator>bleonhardi</dc:creator>
      <dc:date>2016-03-18T07:19:03Z</dc:date>
    </item>
    <item>
      <title>Re: How to save all the output of spark sql query into a text file.</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/How-to-save-all-the-output-of-spark-sql-query-into-a-text/m-p/133919#M23219</link>
      <description>&lt;P&gt;No it is not the old files. I run my program with spark submit command. and my path variable is the path to my hdfs directory. Also, yes the content is changing. &lt;/P&gt;</description>
      <pubDate>Sat, 19 Mar 2016 00:11:21 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/How-to-save-all-the-output-of-spark-sql-query-into-a-text/m-p/133919#M23219</guid>
      <dc:creator>hoda_moradi2014</dc:creator>
      <dc:date>2016-03-19T00:11:21Z</dc:date>
    </item>
    <item>
      <title>Re: How to save all the output of spark sql query into a text file.</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/How-to-save-all-the-output-of-spark-sql-query-into-a-text/m-p/133920#M23220</link>
      <description>&lt;P&gt;&lt;A rel="user" href="https://community.cloudera.com/users/168/bleonhardi.html" nodeid="168"&gt;@Benjamin Leonhardi&lt;/A&gt; Do you have any sample code for java which use the mapPartition instead of foreachRDD&lt;/P&gt;&lt;P&gt; ? &lt;/P&gt;</description>
      <pubDate>Tue, 22 Mar 2016 02:59:58 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/How-to-save-all-the-output-of-spark-sql-query-into-a-text/m-p/133920#M23220</guid>
      <dc:creator>hoda_moradi2014</dc:creator>
      <dc:date>2016-03-22T02:59:58Z</dc:date>
    </item>
    <item>
      <title>Re: How to save all the output of spark sql query into a text file.</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/How-to-save-all-the-output-of-spark-sql-query-into-a-text/m-p/133921#M23221</link>
      <description>&lt;P&gt;&lt;A rel="user" href="https://community.cloudera.com/users/2833/hodamoradi2014.html" nodeid="2833"&gt;@hoda moradi&lt;/A&gt;
&lt;/P&gt;&lt;P&gt;Unfortunately  not for Java but Scala the general difference is just that it changes the fields. The Extractor class is a Java or Scala class that changes an object from one object into another. For example you have columns, you could create a CSV parser that parses the file and returns a structured object containing all fields you need and do any transformations. I think I should do a quick article about that sometimes.&lt;/P&gt;&lt;PRE&gt;var parsedStream = inputStream.mapPartitions {
  records =&amp;gt;
  val extractor = new Extractor(field,regex); 
  records.map { 
    record =&amp;gt; extractor.parse(record) 
  } 
}&lt;/PRE&gt;</description>
      <pubDate>Tue, 22 Mar 2016 18:47:32 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/How-to-save-all-the-output-of-spark-sql-query-into-a-text/m-p/133921#M23221</guid>
      <dc:creator>bleonhardi</dc:creator>
      <dc:date>2016-03-22T18:47:32Z</dc:date>
    </item>
    <item>
      <title>Re: How to save all the output of spark sql query into a text file.</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/How-to-save-all-the-output-of-spark-sql-query-into-a-text/m-p/133922#M23222</link>
      <description>&lt;P&gt;&lt;A rel="user" href="https://community.cloudera.com/users/168/bleonhardi.html" nodeid="168"&gt;@Benjamin Leonhardi&lt;/A&gt; Thank you for your response. Based on your suggestion, I have to apply mapPartitions method on my JavaDStream . That method will return another JavaDStream to me. I cannot use saveAsTextFile() on the JavaDStream so I have to do foreachRDD to be able to do saveAsTextFile. Therefore, I will have the same problem again. correct me if I am wrong because I am new in spark. &lt;/P&gt;</description>
      <pubDate>Wed, 23 Mar 2016 21:30:04 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/How-to-save-all-the-output-of-spark-sql-query-into-a-text/m-p/133922#M23222</guid>
      <dc:creator>hoda_moradi2014</dc:creator>
      <dc:date>2016-03-23T21:30:04Z</dc:date>
    </item>
    <item>
      <title>Re: How to save all the output of spark sql query into a text file.</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/How-to-save-all-the-output-of-spark-sql-query-into-a-text/m-p/133923#M23223</link>
      <description>&lt;P&gt;Hello Hoda,&lt;/P&gt;&lt;P&gt;so yes you would do basically the same. But there are functions on the DStream that do that for you already: saveAsTextFiles and saveAsObjectFiles but as said they essentially do the same you did before. I.e. do a save on each RDD using a timestamp in the filename. &lt;A rel="user" href="https://community.cloudera.com/users/2833/hodamoradi2014.html" nodeid="2833"&gt;@hoda moradi&lt;/A&gt;&lt;/P&gt;&lt;P&gt;&lt;A href="https://spark.apache.org/docs/1.1.1/api/java/org/apache/spark/streaming/dstream/DStream.html" target="_blank"&gt;https://spark.apache.org/docs/1.1.1/api/java/org/apache/spark/streaming/dstream/DStream.html&lt;/A&gt;&lt;/P&gt;</description>
      <pubDate>Thu, 24 Mar 2016 18:12:15 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/How-to-save-all-the-output-of-spark-sql-query-into-a-text/m-p/133923#M23223</guid>
      <dc:creator>bleonhardi</dc:creator>
      <dc:date>2016-03-24T18:12:15Z</dc:date>
    </item>
  </channel>
</rss>

