Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Parquet is not a parquet file (too small)

avatar
Contributor

I have more than 1000 tables in hive. All the tables have same columns. Now I want to create a big table by selecting * from each table.

I have used pyspark to do this.

The following is a spark submit script.

#!/bin/bash

[ $# -ne 2 ] && { echo "Usage : $0 pkgsn repository "; exit 1; }

pkgsn=$1
repository=$2

TIMESTAMP=`date "+%Y-%m-%d"`
touch /home/$USER/logs/${TIMESTAMP}.success_log
touch /home/$USER/logs/${TIMESTAMP}.fail_log
success_logs=/home/$USER/logs/${TIMESTAMP}.success_log
failed_logs=/home/$USER/logs/${TIMESTAMP}.fail_log

function log_status
{
status=$1
message=$2
if [ "$status" -ne 0 ]; then
echo "`date +\"%Y-%m-%d %H:%M:%S\"` [ERROR] $message [Status] $status : failed" | tee -a "${failed_logs}"
#echo "Please find the attached log file for more details"
exit 1
else
echo "`date +\"%Y-%m-%d %H:%M:%S\"` [INFO] $message [Status] $status : success" | tee -a "${success_logs}"
fi
}

job_name=${pkgsn}_${repository}

#Executing Spark job with python script spark.py and Input,Output path as arguments
echo "Step 1 : Running Spark Job"
spark-submit --num-executors 1 --driver-memory 4g --executor-memory 2g --executor-cores 1 --name "${pkgsn}_${repository}" --master "yarn-client" /home/$USER/spark.py ${pkgsn} ${repository}


g_STATUS=$?
log_status $g_STATUS "sparkjob ${job_name}"

echo "*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-"


The spark.py file:

#!/usr/bin/env python
import numpy as np
import sys
import os

#Condition to specify exact number of arguments in the spark-submit command line

if len(sys.argv) != 3:
print "Invalid number of args......"
print "Usage: spark-submit file.py Input_path Output_path"
exit()

pkgsn=sys.argv[1]
repository=sys.argv[2]

# Import pyspark functions
from pyspark import SparkContext, SparkConf
conf = SparkConf()
sc = SparkContext(conf=conf)
from pyspark.sql import HiveContext
sqlContext = HiveContext(sc)

# create dataframe form hive tables:
data = sqlContext.table("history_eventlog.{}_{}".format(pkgsn,repository))

data.registerTempTable('temptable')

# create a new table in hive
sqlContext.sql('create external table if not exists testing.{}(iim_time string, plc_time string, package string, group string, tag_name string, tag_value float, ngp float) row format delimited fields terminated by "," stored as parquet'.format(repository))

# Populate the hive table with data in dataframe
sqlContext.sql('insert into table testing.{} select * from temptable'.format(repository))

sc.stop()


I am able to create the big table in text file format. But when I try to create it with the parquet file format I am getting a lot of empty parquet files. When I query the table it says .parquet is not a parquet file too small at the location of empty parquet files

Moreover when I use spark to create hive tables in text format everything is fine, But when I create them in parquet file format this issue arises.

Why I am I getting this error. Is there a way to fix this in Spark 1.6

 

2 REPLIES 2

avatar
Champion
What I am reading is that you are passing the info to query a single table of the 1000 and insert it into your bigger table, is that right? So you would launch this script and Spark job 1000 times. I recommend a different approach to make better use of spark and I think I have the solution to your issue.

Warning: my last experience with Spark, Hive, and Parquet was in Spark 1.6.0 and Parquet took a lot of memory due to how the writer's behave.

I recommend that you change the job to create union of each DF. So in the Spark application you would loop through each table, read the data and then union it to the last. This be heavy on memory usage to hold all of it but more efficient use of Spark.

I can't get in a spark-shell right now but this doesn't look right. Format is a method of a DF but you have it just have it just after the SQL statement. What are you passing to 'repository'? Are the source tables in parquet format already?

sqlContext.sql('create external table if not exists testing.{}(iim_time string, plc_time string, package string, group string, tag_name string, tag_value float, ngp float) row format delimited fields terminated by "," stored as parquet'.format(repository))

avatar
Contributor

Yes the Source tables are in Parquet.

 

Could you please provide a sample solution for the said issue