Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

How to save all the output of spark sql query into a text file.

avatar
Expert Contributor

I am writing a simple consumer program using spark streaming. My code save some of the data in to the file but not ALL of the data. Can anyone help me how to fix this. I am not sure where I am losing the data.

import org.apache.spark.streaming.kafka.*;
import kafka.serializer.StringDecoder;
import java.io.*;
import java.io.BufferedOutputStream;
import java.io.DataOutputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import org.apache.spark.*;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.*;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import scala.Tuple2;
import org.apache.log4j.Logger;
import org.apache.log4j.Level;
public class ConsumerFile {
	public static void main(String[] args){
	Logger.getLogger("org").setLevel(Level.OFF);
        Logger.getLogger("akka").setLevel(Level.OFF);
	  String topic = args[0];
	 final String path=new String(args[2]);
	  String broker = args[1];
	    SparkConf sparkConf = new SparkConf().setAppName("SparkConsumer").setMaster("local[*]");;
	    JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(2));
	    HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(topic.split(",")));

	    HashMap<String, String> kafkaParams = new HashMap<String, String>();

	    kafkaParams.put("metadata.broker.list", broker);
	    JavaPairInputDStream<String, String> kafkaStream = KafkaUtils.createDirectStream(
	    ssc, String.class, String.class,StringDecoder.class,StringDecoder.class,kafkaPrams,
topicsSet
);
    
    JavaDStream<String> words = kafkaStream.map(new Function<Tuple2<String, String>, String>() 

                                             {
                       public String call(Tuple2<String, String> message)

                                                 {
                                                     return message._2();}});
    words.foreachRDD(
    	      new Function2<JavaRDD<String>, Time, Void>() {
    	   public Void call(JavaRDD<String> rdd, Time time) {
	   SQLContext sqlContext = JavaSQLContextSingleton.getInstance(rdd.context());
    	          // Convert RDD[String] to RDD[case class] to DataFrame
    	          JavaRDD<JavaRow> rowRDD = rdd.map(new Function<String, JavaRow>() {
    	            public JavaRow call(String line) throws Exception{
  	            	String[] fields = line.split(",");
    	              JavaRow record = new JavaRow(fields[0], fields[1],fields[2], fields[3], f		ields[4], fields[5],Long.parseLong(fields[6].trim()), fields[7],fields[8],fields[9],
Long.parseLong(fields[10].trim()),Long.parseLong(fields[11].trim()),Long.parseLong(fields[12].trim()),Long.parseLong(fields[13].trim()),fields[14],fields[15],fields[16],fields[17],Long.parseLong(fields[18].trim()),fields[19],Long.parseLong(fields[20].trim()),Long.parseLong(fields[21].trim())  );

    	              return record;

    	            }

    	          });

    	          DataFrame wordsDataFrame = sqlContext.createDataFrame(rowRDD, JavaRow.class);
    	          wordsDataFrame.registerTempTable("Data");
    	          DataFrame wDataFrame = sqlContext.sql(" select * from Data");  
    	          if(!wDataFrame.rdd().isEmpty()){
    	    	 wDataFrame.rdd().coalesce(1,true,null).saveAsTextFile(path); }
    	          return null;
    	        }} );
    ssc.start();
     ssc.awaitTermination();}
}


1 ACCEPTED SOLUTION

avatar
Master Guru

Hello Hoda,

so I think I know the problem. When you do foreachRDD it essentially executes your function on each RDD of the DStream you save it all to the same file. So they overwrite each others data and the first or last writer wins.

There are savefunctions available on the DStream so you could just transform the data with mapPartition instead of foreachRDD and then save it with DStream.saveAsTextFile.

or the easiest way you save them in a file with a unique name.

wDataFrame.rdd().coalesce(1,true,null).saveAsTextFile(path + time.milliseconds.toString);}

I think the time variable comes already in automatically with foreachRDD but you might have to instantiate a current date before if not. Now this is not very elegant since you could have the same timestamp twice but that is actually how the spark streaming guys do it if you look into the DStream.saveAsTextFile method. You could make this even more unique by adding a random number that is large enough to never run into duplicates or find a way to get the executor id. I would prefer this if you find a way to get it I would be thankful :-).

View solution in original post

9 REPLIES 9

avatar
Master Guru

Hello Hoda,

so I think I know the problem. When you do foreachRDD it essentially executes your function on each RDD of the DStream you save it all to the same file. So they overwrite each others data and the first or last writer wins.

There are savefunctions available on the DStream so you could just transform the data with mapPartition instead of foreachRDD and then save it with DStream.saveAsTextFile.

or the easiest way you save them in a file with a unique name.

wDataFrame.rdd().coalesce(1,true,null).saveAsTextFile(path + time.milliseconds.toString);}

I think the time variable comes already in automatically with foreachRDD but you might have to instantiate a current date before if not. Now this is not very elegant since you could have the same timestamp twice but that is actually how the spark streaming guys do it if you look into the DStream.saveAsTextFile method. You could make this even more unique by adding a random number that is large enough to never run into duplicates or find a way to get the executor id. I would prefer this if you find a way to get it I would be thankful :-).

avatar
Expert Contributor

Thank you so much! you saved me a lot of time.

avatar
Expert Contributor

One more thing, it works fine but still it saves the part-0000 file(last or first RDD) as well. Is there anyway I can get rid of that?

avatar
Master Guru

hmmm good question, can you tell me how you start the program? I.e. what is in the path variable? You are sure that the part files are not simply old and still around from before? I do not see any other function in your code that would write the file. Is the content changing?

avatar
Expert Contributor

No it is not the old files. I run my program with spark submit command. and my path variable is the path to my hdfs directory. Also, yes the content is changing.

avatar
Expert Contributor

@Benjamin Leonhardi Do you have any sample code for java which use the mapPartition instead of foreachRDD

?

avatar
Master Guru

@hoda moradi

Unfortunately not for Java but Scala the general difference is just that it changes the fields. The Extractor class is a Java or Scala class that changes an object from one object into another. For example you have columns, you could create a CSV parser that parses the file and returns a structured object containing all fields you need and do any transformations. I think I should do a quick article about that sometimes.

var parsedStream = inputStream.mapPartitions {
  records =>
  val extractor = new Extractor(field,regex); 
  records.map { 
    record => extractor.parse(record) 
  } 
}

avatar
Expert Contributor

@Benjamin Leonhardi Thank you for your response. Based on your suggestion, I have to apply mapPartitions method on my JavaDStream . That method will return another JavaDStream to me. I cannot use saveAsTextFile() on the JavaDStream so I have to do foreachRDD to be able to do saveAsTextFile. Therefore, I will have the same problem again. correct me if I am wrong because I am new in spark.

avatar
Master Guru

Hello Hoda,

so yes you would do basically the same. But there are functions on the DStream that do that for you already: saveAsTextFiles and saveAsObjectFiles but as said they essentially do the same you did before. I.e. do a save on each RDD using a timestamp in the filename. @hoda moradi

https://spark.apache.org/docs/1.1.1/api/java/org/apache/spark/streaming/dstream/DStream.html