Hello everyone,
I am new for spark processing. I need you help my problem when transform data from plat file to hive orc table. Below is my process flow using pyspark:
1 - Use pyspark to load flat file to dataframe
2 - Transform data in dataframe and insert to hive table(parquet)
3 - Insert data from hive(parquet) to orc format
Step 1 and 2 is fast, but step 3 is too slow because it use much memory. Sometime it stuck cannot continue.
Please help to advice and recommend the better flow.
Thanks
Here is sample code:
-- loading.py
import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import HiveContext,SQLContext
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
from pyspark.sql.types import ArrayType, DoubleType, BooleanType
from pyspark.sql.functions import input_file_name,col,array_contains
spark = SparkSession.builder.appName("testing..").enableHiveSupport().getOrCreate()
df_schema = StructType([
StructField("col1",StringType(),True)
,StructField("col2",StringType(),True)
,StructField("col3",StringType(),True)
,StructField("col4",StringType(),True)
,StructField("col5",StringType(),True)
,StructField("filename",StringType(),True)
,StructField("YEARKEY",StringType(),True)
,StructField("MONTHKEY",StringType(),True)
,StructField("DAYKEY",StringType(),True)
])
dsCSV = spark.read.format("csv").options(header='False', delimiter=';').schema(df_schema).load("/user/test/processing/data/out").withColumn("filename",input_file_name())
dsCSV.registerTempTable("cdr_data")
df_insert=spark.sql("select * from cdr_data")
df_insert.write.option("compression","snappy").mode('append').format('parquet').partitionBy("yearkey","monthkey","daykey").saveAsTable(('landing.test_loading'))
dsCSV.unpersist()
dsCSV.unpersist(True)
df_insert.unpersist()
df_insert.unpersist(True)
-- cdr_hivesql.sh
v_history_records="insert into staging.test_loading select * from landing.test_loading"
echo "====================>>>`date +%Y%m%d%H%M%S`<<<====================="
echo ""
echo $v_history_records
hive -e "$v_history_records;"
Note:
-- landing.test_loading(parquet format)
-- staging.test_loading(orc format)