- Subscribe to RSS Feed
- Mark Question as New
- Mark Question as Read
- Float this Question for Current User
- Bookmark
- Subscribe
- Mute
- Printer Friendly Page
Insert data from spark data frame to hive orc table slow and use much memory
- Labels:
-
Apache Ambari
-
Apache Hive
-
Apache Spark
Created ‎07-14-2022 03:36 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Hello everyone,
I am new for spark processing. I need you help my problem when transform data from plat file to hive orc table. Below is my process flow using pyspark:
1 - Use pyspark to load flat file to dataframe
2 - Transform data in dataframe and insert to hive table(parquet)
3 - Insert data from hive(parquet) to orc format
Step 1 and 2 is fast, but step 3 is too slow because it use much memory. Sometime it stuck cannot continue.
Please help to advice and recommend the better flow.
Thanks
Here is sample code:
-- loading.py
import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import HiveContext,SQLContext
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
from pyspark.sql.types import ArrayType, DoubleType, BooleanType
from pyspark.sql.functions import input_file_name,col,array_contains
spark = SparkSession.builder.appName("testing..").enableHiveSupport().getOrCreate()
df_schema = StructType([
StructField("col1",StringType(),True)
,StructField("col2",StringType(),True)
,StructField("col3",StringType(),True)
,StructField("col4",StringType(),True)
,StructField("col5",StringType(),True)
,StructField("filename",StringType(),True)
,StructField("YEARKEY",StringType(),True)
,StructField("MONTHKEY",StringType(),True)
,StructField("DAYKEY",StringType(),True)
])
dsCSV = spark.read.format("csv").options(header='False', delimiter=';').schema(df_schema).load("/user/test/processing/data/out").withColumn("filename",input_file_name())
dsCSV.registerTempTable("cdr_data")
df_insert=spark.sql("select * from cdr_data")
df_insert.write.option("compression","snappy").mode('append').format('parquet').partitionBy("yearkey","monthkey","daykey").saveAsTable(('landing.test_loading'))
dsCSV.unpersist()
dsCSV.unpersist(True)
df_insert.unpersist()
df_insert.unpersist(True)
-- cdr_hivesql.sh
v_history_records="insert into staging.test_loading select * from landing.test_loading"
echo "====================>>>`date +%Y%m%d%H%M%S`<<<====================="
echo ""
echo $v_history_records
hive -e "$v_history_records;"
Note:
-- landing.test_loading(parquet format)
-- staging.test_loading(orc format)
Created ‎10-03-2023 02:07 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
It appears that you're currently following a two-step process: writing data to a Parquet table and then using that Parquet table to write to an ORC table. You can streamline this by directly writing the data into the ORC format table, eliminating the need to write the same data to a Parquet table before reading it.
Ref -
https://spark.apache.org/docs/2.4.0/sql-data-sources-hive-tables.html
https://docs.cloudera.com/cdp-private-cloud-base/7.1.9/developing-spark-applications/topics/spark-lo...
