Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

How to append column header to Spark SQL query results without using databricks lib?

Highlighted

How to append column header to Spark SQL query results without using databricks lib?

New Contributor

Hi All, How can we add a header to Spark SQL Query results before saving the results in a textfile? Spark version is 1.6

    val topPriceResultsDF = sqlContext.sql("SELECT * FROM retail_db.yahoo_stock_orc WHERE open_price > 40 AND high_price > 40 ORDER BY date ASC")
    topPriceResultsDF.map(x => x.mkString(",")).saveAsTextFile("/user/sparkuser/myspark/data/output/yahoo_above40_results(comma).csv") 

It saves only data but I need to add header like (date,open_price,high_price,low_price,close_price,volume,adj_price) as well . Please help if anyone has idea !! I cannot use databricks library.

O/P should be like 
    date,open_price,high_price,low_price,close_price,volume,adj_price
    1997-07-09,40.75008,45.12504,40.75008,43.99992,37545600,1.83333 

Thanks !!

1 REPLY 1

Re: How to append column header to Spark SQL query results without using databricks lib?

New Contributor

Use mapPartitions if we want to add header in all files or if there is single partition.

topPriceResultsDF
.map(x => x.mkString(","))
.mapPartitions(iter => Iterator(header) ++ iter)
.saveAsTextFile("/user/sparkuser/myspark/data/output/yahoo_above40resultsWithHeader.csv")

Use mapPartitionsWithIndex if we want to add header in only first file

topPriceResultsDF.map(x => x.mkString(","))
.repartition(2)
.mapPartitionsWithIndex ({
case (0, iter) => Iterator(header) ++ iter
case (_, iter) => iter
})
.saveAsTextFile("/user/sparkuser/myspark/data/output/yahoo_above40resultsWithHeader.csv")