Created on 02-04-2017 08:50 PM - edited 09-16-2022 04:01 AM
I have more than 1000 tables in hive. All the tables have same columns. Now I want to create a big table by selecting * from each table.
I have used pyspark to do this.
The following is a spark submit script.
#!/bin/bash
[ $# -ne 2 ] && { echo "Usage : $0 pkgsn repository "; exit 1; }
pkgsn=$1
repository=$2
TIMESTAMP=`date "+%Y-%m-%d"`
touch /home/$USER/logs/${TIMESTAMP}.success_log
touch /home/$USER/logs/${TIMESTAMP}.fail_log
success_logs=/home/$USER/logs/${TIMESTAMP}.success_log
failed_logs=/home/$USER/logs/${TIMESTAMP}.fail_log
function log_status
{
status=$1
message=$2
if [ "$status" -ne 0 ]; then
echo "`date +\"%Y-%m-%d %H:%M:%S\"` [ERROR] $message [Status] $status : failed" | tee -a "${failed_logs}"
#echo "Please find the attached log file for more details"
exit 1
else
echo "`date +\"%Y-%m-%d %H:%M:%S\"` [INFO] $message [Status] $status : success" | tee -a "${success_logs}"
fi
}
job_name=${pkgsn}_${repository}
#Executing Spark job with python script spark.py and Input,Output path as arguments
echo "Step 1 : Running Spark Job"
spark-submit --num-executors 1 --driver-memory 4g --executor-memory 2g --executor-cores 1 --name "${pkgsn}_${repository}" --master "yarn-client" /home/$USER/spark.py ${pkgsn} ${repository}
g_STATUS=$?
log_status $g_STATUS "sparkjob ${job_name}"
echo "*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-"
The spark.py file:
#!/usr/bin/env python
import numpy as np
import sys
import os
#Condition to specify exact number of arguments in the spark-submit command line
if len(sys.argv) != 3:
print "Invalid number of args......"
print "Usage: spark-submit file.py Input_path Output_path"
exit()
pkgsn=sys.argv[1]
repository=sys.argv[2]
# Import pyspark functions
from pyspark import SparkContext, SparkConf
conf = SparkConf()
sc = SparkContext(conf=conf)
from pyspark.sql import HiveContext
sqlContext = HiveContext(sc)
# create dataframe form hive tables:
data = sqlContext.table("history_eventlog.{}_{}".format(pkgsn,repository))
data.registerTempTable('temptable')
# create a new table in hive
sqlContext.sql('create external table if not exists testing.{}(iim_time string, plc_time string, package string, group string, tag_name string, tag_value float, ngp float) row format delimited fields terminated by "," stored as parquet'.format(repository))
# Populate the hive table with data in dataframe
sqlContext.sql('insert into table testing.{} select * from temptable'.format(repository))
sc.stop()
I am able to create the big table in text file format. But when I try to create it with the parquet file format I am getting a lot of empty parquet files. When I query the table it says .parquet is not a parquet file too small at the location of empty parquet files
Moreover when I use spark to create hive tables in text format everything is fine, But when I create them in parquet file format this issue arises.
Why I am I getting this error. Is there a way to fix this in Spark 1.6
Created 02-05-2017 10:17 PM
Created 02-05-2017 10:23 PM
Yes the Source tables are in Parquet.
Could you please provide a sample solution for the said issue