Support Questions
Find answers, ask questions, and share your expertise

Spark csv writing issue on version 1.6.1, spark csv version 1.4

Spark-csv is not saving the dataframe. Sometimes it saves and sometimes its blank.When i do dataframe.count before dataframe.write it always returns correct count but dataframe.write is not saving as csv using spark-csv.


Re: Spark csv writing issue on version 1.6.1, spark csv version 1.4


@Ashnee Sharma, can you please provide a code sample where you are seeing this problem?

Re: Spark csv writing issue on version 1.6.1, spark csv version 1.4

Hi @Ashnee Sharma, can you share a snippet of the code (and any exceptions) you're using? Please be aware that the DataFrame "save" APIs have changed several times over the last few Spark versions.

Here's an example I use to save DataFrames to a filesystem, and also to create a Hive table from it:

df.write.format("orc").mode("overwrite").option("path", fs+"/"+name).saveAsTable(name)

Re: Spark csv writing issue on version 1.6.1, spark csv version 1.4

@Randy Gelhausen

Plz refer:-

here is my Code:-

void createCsvAfterJoins(){ SparkConf sparkConf = new SparkConf(); sparkConf.setMaster("SparkMain"); SQLContext sqlContext = new SQLContext(sparkContext); DataFrame TABLE_BIG = TABLE_IF.join(TABLE_PPS, TABLE_IF.col("--").equalTo(TABLE_PPS.col("--")), "left"); DataFrame TABLE_BIH = TABLE_BIG.join(TABLE_PN, TABLE_BIG.col("--").equalTo(TABLE_PN.col("--")), "left"); DataFrame TABLE_BII = TABLE_BIH.join(TABLE_IP, TABLE_BIH.col("--").equalTo(TABLE_IP.col("--")), "left"); DataFrame TABLE_BIJ = TABLE_BII.join(TABLE_IPC, TABLE_BII.col("--").equalTo(TABLE_IPC.col("--")), "left"); DataFrame TABLE_BIK = TABLE_BIJ.join(TABLE_PBOM, TABLE_BIJ.col("--").equalTo(TABLE_PBOM.col("--")), "left"); DataFrame TABLE_BIL = TABLE_BIK.join(TABLE_PPN, TABLE_BIK.col("--").equalTo(TABLE_PPN.col("--")), "left"); TABLE_BIL.registerTempTable("TABLE_BIL"); DataFrame MAIN_B = sqlContext.sql("--"); System.out.println("\nPHASE_B> " + MAIN_B.count() + "Records with " + (System.currentTimeMillis() - startTime) + "ms."); writeDataFrameAsCSV(MAIN_B, "MAIN_PB", coalesce, partitions); sqlContext.dropTempTable("TABLE_BIL"); } private static DataFrame readCSVAsDataFrame(SQLContext sqlContext, String csvFile) { return"com.databricks.spark.csv").option("header", "true") .option("inferSchema", "true").load(csvFile); } private static void writeDataFrameAsCSV(DataFrame dataFrame, String csvFile, boolean coalesce, int partitions) { HashMap<String, String> saveOptions = new HashMap<String, String>(); saveOptions.put("header", "true"); saveOptions.put("path", csvFile); saveOptions.put("mode", "FAILFAST"); if (coalesce) { System.out.println("Coalescing with Partitions " + partitions); dataFrame.coalesce(partitions).write().format("com.databricks.spark.csv").options(saveOptions) .mode(SaveMode.Overwrite).save(); } else { System.out.println("Saving without coalesce"); dataFrame.write().format("com.databricks.spark.csv").options(saveOptions).mode(SaveMode.Overwrite).save(); } }