Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

save spark-csv output to hive in HDP 2.3.2

avatar
Expert Contributor

Hi,

I am new bee to spark and using spark 1.4.1

How can I save the output to hive as external table .

For instance ,I have a csv file which I am parsing through spark -csv packages which results me a DataFrame.

Now how do I save this dataframe as hive external table using hivecontext.

Would really appreciate your pointers/guidance.

Thanks,

Divya

1 ACCEPTED SOLUTION

avatar
Guru

Hey Divya,

There are a couple of ways to do this. The main flow is this however.

  • Load/parse the data into dataframes. It seems like you have already done this but since you didn't pass along that snippet I'm just going to make something up. You did mention you were using the spark-csv package so the example is doing the same.
  1. val sqlContext = new SQLContext(sc)
    val df = sqlContext.read
        .format("com.databricks.spark.csv")
        .option("header", "true") // Use first line of all files as header
        .option("inferSchema", "true") // Automatically infer data types
        .load("cars.csv");
  • Write the dataframe data to the HDFS location where you plan to create the Hive external table or the directory for an existing Hive table.
    df.select("year", "model").write()
        .format("com.databricks.spark.csv")
        .option("header", "true")
        .save("hdfs://hdfs_location/newcars.csv");
  • Create the external Hive table by creating a HiveSQLContext
    val hiveSQLContext = new org.apache.spark.sql.hive.HiveContext(sc)
    
    //Several other options can be passed in here for other formats, partitions, etc
    hiveSQLContext.sql("CREATE EXTERNAL TABLE cars(year INT, model STRING) STORED AS TEXTFILE LOCATION 'hdfs_location'");
  • Query the Hive table with whatever query you wish
    // Queries are expressed in HiveQL
    hiveSQLContext.sql("SELECT * FROM cars").collect().foreach(println)

View solution in original post

5 REPLIES 5

avatar
Master Mentor

@Divya Gehlot Could you share the code here ? I would love to test it in my env.

avatar
Guru

Hey Divya,

There are a couple of ways to do this. The main flow is this however.

  • Load/parse the data into dataframes. It seems like you have already done this but since you didn't pass along that snippet I'm just going to make something up. You did mention you were using the spark-csv package so the example is doing the same.
  1. val sqlContext = new SQLContext(sc)
    val df = sqlContext.read
        .format("com.databricks.spark.csv")
        .option("header", "true") // Use first line of all files as header
        .option("inferSchema", "true") // Automatically infer data types
        .load("cars.csv");
  • Write the dataframe data to the HDFS location where you plan to create the Hive external table or the directory for an existing Hive table.
    df.select("year", "model").write()
        .format("com.databricks.spark.csv")
        .option("header", "true")
        .save("hdfs://hdfs_location/newcars.csv");
  • Create the external Hive table by creating a HiveSQLContext
    val hiveSQLContext = new org.apache.spark.sql.hive.HiveContext(sc)
    
    //Several other options can be passed in here for other formats, partitions, etc
    hiveSQLContext.sql("CREATE EXTERNAL TABLE cars(year INT, model STRING) STORED AS TEXTFILE LOCATION 'hdfs_location'");
  • Query the Hive table with whatever query you wish
    // Queries are expressed in HiveQL
    hiveSQLContext.sql("SELECT * FROM cars").collect().foreach(println)

avatar
Rising Star

Thanks for the explanation.

avatar
Expert Contributor

Hi

@ Neeraj Sabharwal

@Jeremy Dyer

  • Processing and inserting data in hive without schema

//Processing and inserting data in hive without schema 
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.hive.orc._
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
val df = hiveContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("/tmp/cars.csv")
val selectedData = df.select("year", "model")
selectedData.write.format("orc").option("header", "true").save("/tmp/newcars_orc_cust17")
//permission issues as user hive 
// org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:java.security.AccessControlException: Permission denied: user=hive, access=WRITE, inode="/tmp/newcars_orc_cust17":hdfs:hdfs:drwxr-xr-x
//Updated /tmp/newcars_orc_cust17 directory permissions
hiveContext.sql("create external table newcars_orc_ext_cust17(year string,model string) stored as orc location '/tmp/newcars_orc_cust17'")
hiveContext.sql("show tables").collect().foreach(println)
[cars_orc_ext,false]
[cars_orc_ext1,false]
[cars_orc_exte,false]
[newcars_orc_ext_cust17,false]
[sample_07,false]
[sample_08,false]
hiveContext.sql("select * from newcars_orc_ext_cust17").collect().foreach(println)
ook 1.459321 s
[2012,S]
[1997,E350]
[2015,Volt]

Hive console

hive> show tables ;
OK
cars_orc_ext
cars_orc_ext1
cars_orc_exte
newcars_orc_ext_cust17
sample_07
sample_08
Time taken: 12.185 seconds, Fetched: 6 row(s)
hive> select * from newcars_orc_ext_cust17 ;
OK
2012    S
1997    E350
2015    Volt
Time taken: 48.922 seconds, Fetched: 3 row(s)

Now When I try the same code by defining the custom schema and executing it Getting below errors :

  • Processing and inserting data in hive with custom schema
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.hive.orc._
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType};
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
val customSchema = StructType( StructField("year", IntegerType, true),StructField("make", StringType, true),StructField("model", StringType, true),StructField("comment", StringType, true),StructField("blank", StringType, true))
<br>scala> val customSchema = StructType( StructField("year", IntegerType, true),StructField("make", StringType, true),StructField("model", StringType, true),StructField("comment", StringType, true),StructField("blank", StringType, true))
<console>:24: error: overloaded method value apply with alternatives:
  (fields: Array[org.apache.spark.sql.types.StructField])org.apache.spark.sql.types.StructType <and>
  (fields: java.util.List[org.apache.spark.sql.types.StructField])org.apache.spark.sql.types.StructType <and>
  (fields: Seq[org.apache.spark.sql.types.StructField])org.apache.spark.sql.types.StructType
 cannot be applied to (org.apache.spark.sql.types.StructField, org.apache.spark.sql.types.StructField, org.apache.spark.sql.types.StructField, org.apache.spark.sql.types.StructField, org.apache.spark.sql.types.StructField)
  val customSchema = StructType( StructField("year", IntegerType, true),StructField("make", StringType, true),StructField("model", StringType, true),StructField("comment", StringType, true),StructField("blank", StringType, true))

Any help/pointers appreciated

Thanks

avatar
Explorer

You have to do a sequence of StructTypes, a java list of structTypes, or a Sequence of struct Types.

You could have passed in a Sequence of Structfields. Thats one the constructors (fields:Seq[org.apache.spark.sql.types.StructField])org.apache.spark.sql.types.StructType

So...

val customSchema =StructType(Seq(StructField("year",IntegerType,true),StructField("make",StringType,true),StructField("model",StringType,true),StructField("comment",StringType,true),StructField("blank",StringType,true)))