Support Questions

Find answers, ask questions, and share your expertise

Migrating from one hive table to another hive table Using Spark,withe differend colum name and database with same cluster

avatar
Expert Contributor

Hive Table:

Orginal table

Database Name : Student

Tabe name : Student_detail

idnamedept
1sivacse

Need Output :

Database Name : CSE

Tabe name : New_tudent_detail

s_ids_names_dept
1sivacse

i want Migrate Student_detail hive table into New_tudent_detail without data lose using spark

Different colum name

Different database

Different table

1 ACCEPTED SOLUTION

avatar
Contributor

Hi @Sivasaravanakumar K

Here's one way of going about this.

Note the example below is based on the sample data available on the hortonworks sandbox. Just change the database, table and column name to suit you needs

0. Get database and table info

//show databases in Hive
sqlContext.sql("show databases").show

//show table in a database
sqlContext.sql("show tables in default").show

//read the table headers
sqlContext.sql("select * from default.sample_07").printSchema

result

--------+
|  result|
+--------+
| default|
|foodmart|
|  xademo|
+--------+

+---------+-----------+
|tableName|isTemporary|
+---------+-----------+
|sample_07|      false|
|sample_08|      false|
+---------+-----------+

root
 |-- code: string (nullable = true)
 |-- description: string (nullable = true)
 |-- total_emp: integer (nullable = true)
 |-- salary: integer (nullable = true)

1. Read table data into a DataFrame :

// read data from Hive
val df = sqlContext.sql("select * from default.sample_07")
//Show Table Schema 
df.printSchema

result

root
 |-- code: string (nullable = true)
 |-- description: string (nullable = true)
 |-- total_emp: integer (nullable = true)
 |-- salary: integer (nullable = true)

2. Change column names

Change a single column name with the withColumnRenamed function

val df_renamed = df.withColumnRenamed("salary", "money") 
df_renamed.printSchema 

Or all at once using a list of header

val newNames = Seq("code_1", "description_1", "total_emp_1", "money_1") 
val df_renamed = df.toDF(newNames: _*) 
df_renamed.printSchema 

Note you can combine reading toghether so as not to create 2 sets of data in memory

val newNames = Seq("code_1", "description_1", "total_emp_1", "money_1") 
val df = sqlContext.sql("select * from default.sample_07").toDF(newNames: _*)

Or all at once using SQL alias (** preferred)

val df = sqlContext.sql("select code as code_1, description as description_1, total_emp as total_emp_1, salary as money from default.sample_07") 

df.printSchema

result (using SQL alias)

df: org.apache.spark.sql.DataFrame = [code_1: string, description_1: string, total_emp_1: int, money: int]
root
 |-- code_1: string (nullable = true)
 |-- description_1: string (nullable = true)
 |-- total_emp_1: integer (nullable = true)
 |-- money: integer (nullable = true)

3. Save back to hive

//write to Hive (in ORC format) 
df.write.format("orc").saveAsTable("default.sample_07_new_schema") 

//read back and check new_schema
sqlContext.sql("select * from default.sample_07_new_schema").printSchema

result

root
 |-- code_1: string (nullable = true)
 |-- description_1: string (nullable = true)
 |-- total_emp_1: integer (nullable = true)
 |-- money: integer (nullable = true)

View solution in original post

6 REPLIES 6

avatar
Contributor

Hi @Sivasaravanakumar K

Here's one way of going about this.

Note the example below is based on the sample data available on the hortonworks sandbox. Just change the database, table and column name to suit you needs

0. Get database and table info

//show databases in Hive
sqlContext.sql("show databases").show

//show table in a database
sqlContext.sql("show tables in default").show

//read the table headers
sqlContext.sql("select * from default.sample_07").printSchema

result

--------+
|  result|
+--------+
| default|
|foodmart|
|  xademo|
+--------+

+---------+-----------+
|tableName|isTemporary|
+---------+-----------+
|sample_07|      false|
|sample_08|      false|
+---------+-----------+

root
 |-- code: string (nullable = true)
 |-- description: string (nullable = true)
 |-- total_emp: integer (nullable = true)
 |-- salary: integer (nullable = true)

1. Read table data into a DataFrame :

// read data from Hive
val df = sqlContext.sql("select * from default.sample_07")
//Show Table Schema 
df.printSchema

result

root
 |-- code: string (nullable = true)
 |-- description: string (nullable = true)
 |-- total_emp: integer (nullable = true)
 |-- salary: integer (nullable = true)

2. Change column names

Change a single column name with the withColumnRenamed function

val df_renamed = df.withColumnRenamed("salary", "money") 
df_renamed.printSchema 

Or all at once using a list of header

val newNames = Seq("code_1", "description_1", "total_emp_1", "money_1") 
val df_renamed = df.toDF(newNames: _*) 
df_renamed.printSchema 

Note you can combine reading toghether so as not to create 2 sets of data in memory

val newNames = Seq("code_1", "description_1", "total_emp_1", "money_1") 
val df = sqlContext.sql("select * from default.sample_07").toDF(newNames: _*)

Or all at once using SQL alias (** preferred)

val df = sqlContext.sql("select code as code_1, description as description_1, total_emp as total_emp_1, salary as money from default.sample_07") 

df.printSchema

result (using SQL alias)

df: org.apache.spark.sql.DataFrame = [code_1: string, description_1: string, total_emp_1: int, money: int]
root
 |-- code_1: string (nullable = true)
 |-- description_1: string (nullable = true)
 |-- total_emp_1: integer (nullable = true)
 |-- money: integer (nullable = true)

3. Save back to hive

//write to Hive (in ORC format) 
df.write.format("orc").saveAsTable("default.sample_07_new_schema") 

//read back and check new_schema
sqlContext.sql("select * from default.sample_07_new_schema").printSchema

result

root
 |-- code_1: string (nullable = true)
 |-- description_1: string (nullable = true)
 |-- total_emp_1: integer (nullable = true)
 |-- money: integer (nullable = true)

avatar
Expert Contributor

Hi @Matthieu Lamairesse

Error :

scala> df.write.format("orc").saveAsTable("default.sample_07_new_schema") <console>:33: error: value write is not a member of org.apache.spark.sql.DataFrame df.write.format("orc").saveAsTable("default.sample_07_new_schema")

^

avatar
Contributor

Hi @Sivasaravanakumar K

I've simplified my answer a bit. What version of spark are you using ? This was tested on Spark 1.6.2 on a HDP 2.5 sandbox

Note : When using spark-shell did you import :

import org.apache.spark.sql.hive.orc._
import org.apache.spark.sql._

avatar
Expert Contributor

i already import

import org.apache.spark.sql.hive.orc._
import org.apache.spark.sql._

still i have the same issue i am using HDP 2.3

avatar
Contributor

um which version of Spark ?

1.3.1 => HDP 2.3.0

1.4.1 => HDP 2.3.2

1.5.2 => HDP 2.3.4

I have a feeling it's spark 1.3, they made some major improvement in spark <=> Hive integration starting with spark 1.4.1.

avatar
Contributor

Hi @Sivasaravanakumar K

The write function was implemented in 1.4.1...

Try simply :

df.saveAsTable("default.sample_07_new_schema") 

It will be saved as Parquet (default format for Spark)