Support Questions

Find answers, ask questions, and share your expertise

Spark JDBC to Read and Write from and to Hive

avatar
New Contributor

I'm trying to come up with a generic implementation to use Spark JDBC to support Read/Write data from/to various JDBC compliant databases like PostgreSQL, MySQL, Hive, etc.

 

My code looks something like below.

val conf = new SparkConf().setAppName("Spark Hive JDBC").setMaster("local[*]")

val sc = new SparkContext(conf)

val spark = SparkSession
  .builder()
  .appName("Spark Hive JDBC Example")
  .getOrCreate()

val jdbcDF = spark.read
  .format("jdbc")
  .option("url", "jdbc:hive2://host1:10000/default")
  .option("dbtable", "student1")
  .option("user", "hive")
  .option("password", "hive")
.option("driver", "com.cloudera.hive.jdbc4.HS2Driver") //.option("driver", "org.apache.hadoop.hive.jdbc.HiveDriver") .load() jdbcDF.printSchema jdbcDF.write .format("jdbc") .option("url", "jdbc:hive2://127.0.0.1:10000/default") .option("dbtable", "student2") .option("user", "hive") .option("password", "hive")
.option("driver", "com.cloudera.hive.jdbc4.HS2Driver") //.option("driver", "org.apache.hadoop.hive.jdbc.HiveDriver") .mode(SaveMode.Overwrite)

 

Schema Structure:

root
    |-- name: string (nullable = true)
    |-- id: integer (nullable = true)
    |-- dept: string (nullable = true)

 

The above code works seamlessly for PostgreSQL, MySQL databases, but it starts causing problems as soon as I use Hive related JDBC config.

 

First, my read was not able to read any data and returning empty results. After some search, I could able to make read work by adding the custom HiveDialect but still, I'm facing issue in writing data to Hive.

 

case object HiveDialect extends JdbcDialect {
  override def canHandle(url: String): Boolean = url.startsWith("jdbc:hive2")
  override def quoteIdentifier(colName: String): String = s"`$colName`"
  override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
    case StringType => Option(JdbcType("STRING", Types.VARCHAR))
    case _ => None
  }
}

JdbcDialects.registerDialect(HiveDialect)

 

Error in Write using Cloudera Hive JDBC Driver:

 

19/11/14 11:02:05 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
java.sql.SQLException: [Cloudera][HiveJDBCDriver](500051) ERROR processing query/statement. Error Code: 0, SQL state: org.apache.spark.sql.catalyst.parser.ParseException:
mismatched input '`id`' expecting {'(', 'SELECT', 'FROM', 'VALUES', 'TABLE', 'INSERT', 'MAP', 'REDUCE'}(line 1, pos 25)
== SQL ==
INSERT INTO empdata_bkp (`id`,`name`,`dept`) VALUES (123,'siva','CS') ,(345,'ganesh','CS') ,(234,'mani','IT')
-------------------------^^^
, Query: INSERT INTO empdata_bkp (`id`,`name`,`dept`) VALUES (123,'siva','CS') ,(345,'ganesh','CS') ,(234,'mani','IT').
at com.cloudera.hiveserver2.hivecommon.api.HS2Client.pollForOperationCompletion(Unknown Source)
at com.cloudera.hiveserver2.hivecommon.api.HS2Client.executeStatementInternal(Unknown Source)
at com.cloudera.hiveserver2.hivecommon.api.HS2Client.executeStatement(Unknown Source)
at com.cloudera.hiveserver2.hivecommon.dataengine.HiveJDBCNativeQueryExecutor.executeHelper(Unknown Source)
at com.cloudera.hiveserver2.hivecommon.dataengine.HiveJDBCNativeQueryExecutor.executeHelperParameter(Unknown Source)
at com.cloudera.hiveserver2.hivecommon.dataengine.HiveJDBCNativeQueryExecutor.execute(Unknown Source)
at com.cloudera.hiveserver2.jdbc.common.SPreparedStatement.executePreparedAnyBatch(Unknown Source)
at com.cloudera.hiveserver2.jdbc.common.SPreparedStatement.executeBatch(Unknown Source)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:672)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:834)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:834)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

 

Error in Write using Hive JDBC Driver:

 

19/11/13 10:30:14 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
java.sql.SQLException: Method not supported
    at org.apache.hive.jdbc.HivePreparedStatement.addBatch(HivePreparedStatement.java:75)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:664)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:834)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:834)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)

 

How I can execute Hive Queries(Read/Write) from Spark to Multiple Remote Hive Servers using Spark JDBC?

 

I can't use the Hive metastore URI approach since, in that case, I will be restricting myself with single Hive metastore config. Also as I mentioned earlier I want the approach to be generic for all the database types (PostgreSQL, MySQL, Hive) so taking the Hive metastore URI approach won't work in my case. Kindly suggest.

 

Dependency Details:

  • Scala version: 2.11
  • Spark version: 2.4.3
  • Hive version: 2.1.1.
  • Hive JDBC Driver Used: 2.0.1
  • Cloudera Hive JDBC Driver Used: 4-2.6.5.1007
1 REPLY 1

avatar
Moderator

Hello @gtmanoj23051988 ,

 

thank you for the detailed issue description. It seems to be similar that was described in this thread, hence just summarising here the solution for your convenience:

 

Impala/Hive Driver tries to transform queries that were already in the Impala/Hive native form when UseNativeQuery is set to zero or not set at all causing the syntax error.

 

You can overcome this exception by adding UseNativeQuery=1 in the JDBC query string parameter.

 

Kind regards:

Ferenc

 

 

 

 


Ferenc Erdelyi, Technical Solutions Manager

Was your question answered? Make sure to mark the answer as the accepted solution.
If you find a reply useful, say thanks by clicking on the thumbs up button.

Learn more about the Cloudera Community: