I tried to create a function which would get the data from relational database and insert them into Hive table. Since I use Spark 1.6, I need to register a temporary table, because writing dataframe directly as Hive table is not compatible with Hive:
spark_conf = SparkConf()
sc = SparkContext(conf=spark_conf)
sqlContext = HiveContext(sc)
query = "(select * from employees where emp_no < 10008) as emp_alias"
df = sqlContext.read.format("jdbc").option("url", url) \
.option("dbtable", query) \
.option("user", user) \
sqlContext.sql('insert into table employment_db.hive_employees select * from tempEmp')
The employees table in RDB contains few thousand records. After running my program I can see that two parquet files are created:
a file, which is created after my code finishes
a file, which is created after two hours
So when I try to select from the Hive table after the job is completed, there are missing records.
I have multiple ideas, which could cause the problem:
Could it be caused by lazy evaluation of registerTempTable? Does Spark think that I don't use those records? I am familiar with lazy evaluation in generators, but I can't imagine how exactly lazy evaluation works in registerTempTable function.
Does it save the temporary tables in tmp folder? Can it be caused because of not enough space? Should I use the dropTempTable function?
Is safer to use createOrReplaceTempView (despite the fact that registerTempTable is deprecated in Spark 2).
using Spark 1.6 on Yarn (Hadoop 2.6.0-cdh5.8.0)
running multiple jobs with different Hive Context, but I don't access the temporary tables across the context