Created 12-14-2015 09:31 AM
Hi,
I am new bee to spark and using spark 1.4.1
How can I save the output to hive as external table .
For instance ,I have a csv file which I am parsing through spark -csv packages which results me a DataFrame.
Now how do I save this dataframe as hive external table using hivecontext.
Would really appreciate your pointers/guidance.
Thanks,
Divya
Created 12-14-2015 01:02 PM
Hey Divya,
There are a couple of ways to do this. The main flow is this however.
val sqlContext = new SQLContext(sc) val df = sqlContext.read .format("com.databricks.spark.csv") .option("header", "true") // Use first line of all files as header .option("inferSchema", "true") // Automatically infer data types .load("cars.csv");
df.select("year", "model").write() .format("com.databricks.spark.csv") .option("header", "true") .save("hdfs://hdfs_location/newcars.csv");
val hiveSQLContext = new org.apache.spark.sql.hive.HiveContext(sc) //Several other options can be passed in here for other formats, partitions, etc hiveSQLContext.sql("CREATE EXTERNAL TABLE cars(year INT, model STRING) STORED AS TEXTFILE LOCATION 'hdfs_location'");
// Queries are expressed in HiveQL hiveSQLContext.sql("SELECT * FROM cars").collect().foreach(println)
Created 12-14-2015 10:57 AM
@Divya Gehlot Could you share the code here ? I would love to test it in my env.
Created 12-14-2015 01:02 PM
Hey Divya,
There are a couple of ways to do this. The main flow is this however.
val sqlContext = new SQLContext(sc) val df = sqlContext.read .format("com.databricks.spark.csv") .option("header", "true") // Use first line of all files as header .option("inferSchema", "true") // Automatically infer data types .load("cars.csv");
df.select("year", "model").write() .format("com.databricks.spark.csv") .option("header", "true") .save("hdfs://hdfs_location/newcars.csv");
val hiveSQLContext = new org.apache.spark.sql.hive.HiveContext(sc) //Several other options can be passed in here for other formats, partitions, etc hiveSQLContext.sql("CREATE EXTERNAL TABLE cars(year INT, model STRING) STORED AS TEXTFILE LOCATION 'hdfs_location'");
// Queries are expressed in HiveQL hiveSQLContext.sql("SELECT * FROM cars").collect().foreach(println)
Created 07-31-2016 01:44 AM
Thanks for the explanation.
Created 12-17-2015 05:01 AM
Hi
//Processing and inserting data in hive without schema import org.apache.spark.sql.hive.HiveContext import org.apache.spark.sql.hive.orc._ val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) val df = hiveContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("/tmp/cars.csv") val selectedData = df.select("year", "model") selectedData.write.format("orc").option("header", "true").save("/tmp/newcars_orc_cust17") //permission issues as user hive // org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:java.security.AccessControlException: Permission denied: user=hive, access=WRITE, inode="/tmp/newcars_orc_cust17":hdfs:hdfs:drwxr-xr-x //Updated /tmp/newcars_orc_cust17 directory permissions hiveContext.sql("create external table newcars_orc_ext_cust17(year string,model string) stored as orc location '/tmp/newcars_orc_cust17'") hiveContext.sql("show tables").collect().foreach(println) [cars_orc_ext,false] [cars_orc_ext1,false] [cars_orc_exte,false] [newcars_orc_ext_cust17,false] [sample_07,false] [sample_08,false] hiveContext.sql("select * from newcars_orc_ext_cust17").collect().foreach(println) ook 1.459321 s [2012,S] [1997,E350] [2015,Volt]
Hive console
hive> show tables ; OK cars_orc_ext cars_orc_ext1 cars_orc_exte newcars_orc_ext_cust17 sample_07 sample_08 Time taken: 12.185 seconds, Fetched: 6 row(s) hive> select * from newcars_orc_ext_cust17 ; OK 2012 S 1997 E350 2015 Volt Time taken: 48.922 seconds, Fetched: 3 row(s)
Now When I try the same code by defining the custom schema and executing it Getting below errors :
import org.apache.spark.sql.hive.HiveContext import org.apache.spark.sql.hive.orc._ import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType}; val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) val customSchema = StructType( StructField("year", IntegerType, true),StructField("make", StringType, true),StructField("model", StringType, true),StructField("comment", StringType, true),StructField("blank", StringType, true)) <br>scala> val customSchema = StructType( StructField("year", IntegerType, true),StructField("make", StringType, true),StructField("model", StringType, true),StructField("comment", StringType, true),StructField("blank", StringType, true)) <console>:24: error: overloaded method value apply with alternatives: (fields: Array[org.apache.spark.sql.types.StructField])org.apache.spark.sql.types.StructType <and> (fields: java.util.List[org.apache.spark.sql.types.StructField])org.apache.spark.sql.types.StructType <and> (fields: Seq[org.apache.spark.sql.types.StructField])org.apache.spark.sql.types.StructType cannot be applied to (org.apache.spark.sql.types.StructField, org.apache.spark.sql.types.StructField, org.apache.spark.sql.types.StructField, org.apache.spark.sql.types.StructField, org.apache.spark.sql.types.StructField) val customSchema = StructType( StructField("year", IntegerType, true),StructField("make", StringType, true),StructField("model", StringType, true),StructField("comment", StringType, true),StructField("blank", StringType, true))
Any help/pointers appreciated
Thanks
Created 02-11-2018 05:07 PM
You have to do a sequence of StructTypes, a java list of structTypes, or a Sequence of struct Types.
You could have passed in a Sequence of Structfields. Thats one the constructors (fields:Seq[org.apache.spark.sql.types.StructField])org.apache.spark.sql.types.StructType
So...
val customSchema =StructType(Seq(StructField("year",IntegerType,true),StructField("make",StringType,true),StructField("model",StringType,true),StructField("comment",StringType,true),StructField("blank",StringType,true)))