Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Insert data from spark data frame to hive orc table slow and use much memory

avatar
Explorer

Hello everyone,

I am new for spark processing. I need you help my problem when transform data from plat file to hive orc table. Below is my process flow using pyspark:
1 - Use pyspark to load flat file to dataframe

2 - Transform data in dataframe and insert to hive table(parquet)

3 - Insert data from hive(parquet) to orc format

 

Step 1 and 2 is fast, but step 3 is too slow because it use much memory. Sometime it stuck cannot continue.

 

Please help to advice and recommend the better flow.

 

Thanks

 

Here is sample code:

-- loading.py

import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import HiveContext,SQLContext
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
from pyspark.sql.types import ArrayType, DoubleType, BooleanType
from pyspark.sql.functions import input_file_name,col,array_contains


spark = SparkSession.builder.appName("testing..").enableHiveSupport().getOrCreate()


df_schema = StructType([
StructField("col1",StringType(),True)
,StructField("col2",StringType(),True)
,StructField("col3",StringType(),True)
,StructField("col4",StringType(),True)
,StructField("col5",StringType(),True)
,StructField("filename",StringType(),True)
,StructField("YEARKEY",StringType(),True)
,StructField("MONTHKEY",StringType(),True)
,StructField("DAYKEY",StringType(),True)
])


dsCSV = spark.read.format("csv").options(header='False', delimiter=';').schema(df_schema).load("/user/test/processing/data/out").withColumn("filename",input_file_name())
dsCSV.registerTempTable("cdr_data")

df_insert=spark.sql("select * from cdr_data")

df_insert.write.option("compression","snappy").mode('append').format('parquet').partitionBy("yearkey","monthkey","daykey").saveAsTable(('landing.test_loading'))

dsCSV.unpersist()
dsCSV.unpersist(True)
df_insert.unpersist()
df_insert.unpersist(True)

 

 

-- cdr_hivesql.sh

v_history_records="insert into staging.test_loading select * from landing.test_loading"

echo "====================>>>`date +%Y%m%d%H%M%S`<<<====================="
echo ""
echo $v_history_records
hive -e "$v_history_records;"

 

Note:

-- landing.test_loading(parquet format)
-- staging.test_loading(orc format)

 

1 REPLY 1

avatar
Expert Contributor

It appears that you're currently following a two-step process: writing data to a Parquet table and then using that Parquet table to write to an ORC table. You can streamline this by directly writing the data into the ORC format table, eliminating the need to write the same data to a Parquet table before reading it. 

Ref - 
https://spark.apache.org/docs/2.4.0/sql-data-sources-hive-tables.html

https://docs.cloudera.com/cdp-private-cloud-base/7.1.9/developing-spark-applications/topics/spark-lo...