Created on 11-13-2019 09:36 PM - edited 11-13-2019 09:38 PM
I'm trying to come up with a generic implementation to use Spark JDBC to support Read/Write data from/to various JDBC compliant databases like PostgreSQL, MySQL, Hive, etc.
My code looks something like below.
val conf = new SparkConf().setAppName("Spark Hive JDBC").setMaster("local[*]") val sc = new SparkContext(conf) val spark = SparkSession .builder() .appName("Spark Hive JDBC Example") .getOrCreate() val jdbcDF = spark.read .format("jdbc") .option("url", "jdbc:hive2://host1:10000/default") .option("dbtable", "student1") .option("user", "hive") .option("password", "hive")
.option("driver", "com.cloudera.hive.jdbc4.HS2Driver") //.option("driver", "org.apache.hadoop.hive.jdbc.HiveDriver") .load() jdbcDF.printSchema jdbcDF.write .format("jdbc") .option("url", "jdbc:hive2://127.0.0.1:10000/default") .option("dbtable", "student2") .option("user", "hive") .option("password", "hive")
.option("driver", "com.cloudera.hive.jdbc4.HS2Driver") //.option("driver", "org.apache.hadoop.hive.jdbc.HiveDriver") .mode(SaveMode.Overwrite)
Schema Structure:
root |-- name: string (nullable = true) |-- id: integer (nullable = true) |-- dept: string (nullable = true)
The above code works seamlessly for PostgreSQL, MySQL databases, but it starts causing problems as soon as I use Hive related JDBC config.
First, my read was not able to read any data and returning empty results. After some search, I could able to make read work by adding the custom HiveDialect but still, I'm facing issue in writing data to Hive.
case object HiveDialect extends JdbcDialect { override def canHandle(url: String): Boolean = url.startsWith("jdbc:hive2") override def quoteIdentifier(colName: String): String = s"`$colName`" override def getJDBCType(dt: DataType): Option[JdbcType] = dt match { case StringType => Option(JdbcType("STRING", Types.VARCHAR)) case _ => None } } JdbcDialects.registerDialect(HiveDialect)
Error in Write using Cloudera Hive JDBC Driver:
19/11/14 11:02:05 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
java.sql.SQLException: [Cloudera][HiveJDBCDriver](500051) ERROR processing query/statement. Error Code: 0, SQL state: org.apache.spark.sql.catalyst.parser.ParseException:
mismatched input '`id`' expecting {'(', 'SELECT', 'FROM', 'VALUES', 'TABLE', 'INSERT', 'MAP', 'REDUCE'}(line 1, pos 25)
== SQL ==
INSERT INTO empdata_bkp (`id`,`name`,`dept`) VALUES (123,'siva','CS') ,(345,'ganesh','CS') ,(234,'mani','IT')
-------------------------^^^
, Query: INSERT INTO empdata_bkp (`id`,`name`,`dept`) VALUES (123,'siva','CS') ,(345,'ganesh','CS') ,(234,'mani','IT').
at com.cloudera.hiveserver2.hivecommon.api.HS2Client.pollForOperationCompletion(Unknown Source)
at com.cloudera.hiveserver2.hivecommon.api.HS2Client.executeStatementInternal(Unknown Source)
at com.cloudera.hiveserver2.hivecommon.api.HS2Client.executeStatement(Unknown Source)
at com.cloudera.hiveserver2.hivecommon.dataengine.HiveJDBCNativeQueryExecutor.executeHelper(Unknown Source)
at com.cloudera.hiveserver2.hivecommon.dataengine.HiveJDBCNativeQueryExecutor.executeHelperParameter(Unknown Source)
at com.cloudera.hiveserver2.hivecommon.dataengine.HiveJDBCNativeQueryExecutor.execute(Unknown Source)
at com.cloudera.hiveserver2.jdbc.common.SPreparedStatement.executePreparedAnyBatch(Unknown Source)
at com.cloudera.hiveserver2.jdbc.common.SPreparedStatement.executeBatch(Unknown Source)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:672)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:834)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:834)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
Error in Write using Hive JDBC Driver:
19/11/13 10:30:14 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1) java.sql.SQLException: Method not supported at org.apache.hive.jdbc.HivePreparedStatement.addBatch(HivePreparedStatement.java:75) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:664) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:834) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:834) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
How I can execute Hive Queries(Read/Write) from Spark to Multiple Remote Hive Servers using Spark JDBC?
I can't use the Hive metastore URI approach since, in that case, I will be restricting myself with single Hive metastore config. Also as I mentioned earlier I want the approach to be generic for all the database types (PostgreSQL, MySQL, Hive) so taking the Hive metastore URI approach won't work in my case. Kindly suggest.
Dependency Details:
Created 04-21-2020 06:47 AM
Hello @gtmanoj23051988 ,
thank you for the detailed issue description. It seems to be similar that was described in this thread, hence just summarising here the solution for your convenience:
Impala/Hive Driver tries to transform queries that were already in the Impala/Hive native form when UseNativeQuery is set to zero or not set at all causing the syntax error.
You can overcome this exception by adding UseNativeQuery=1 in the JDBC query string parameter.
Kind regards:
Ferenc
Ferenc Erdelyi, Technical Solutions Manager
Was your question answered? Make sure to mark the answer as the accepted solution.
If you find a reply useful, say thanks by clicking on the thumbs up button.
Learn more about the Cloudera Community: