Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Spark JDBC to Read and Write from and to Hive

Highlighted

Spark JDBC to Read and Write from and to Hive

New Contributor

I'm trying to come up with a generic implementation to use Spark JDBC to support Read/Write data from/to various JDBC compliant databases like PostgreSQL, MySQL, Hive, etc.

 

My code looks something like below.

val conf = new SparkConf().setAppName("Spark Hive JDBC").setMaster("local[*]")

val sc = new SparkContext(conf)

val spark = SparkSession
  .builder()
  .appName("Spark Hive JDBC Example")
  .getOrCreate()

val jdbcDF = spark.read
  .format("jdbc")
  .option("url", "jdbc:hive2://host1:10000/default")
  .option("dbtable", "student1")
  .option("user", "hive")
  .option("password", "hive")
.option("driver", "com.cloudera.hive.jdbc4.HS2Driver") //.option("driver", "org.apache.hadoop.hive.jdbc.HiveDriver") .load() jdbcDF.printSchema jdbcDF.write .format("jdbc") .option("url", "jdbc:hive2://127.0.0.1:10000/default") .option("dbtable", "student2") .option("user", "hive") .option("password", "hive")
.option("driver", "com.cloudera.hive.jdbc4.HS2Driver") //.option("driver", "org.apache.hadoop.hive.jdbc.HiveDriver") .mode(SaveMode.Overwrite)

 

Schema Structure:

root
    |-- name: string (nullable = true)
    |-- id: integer (nullable = true)
    |-- dept: string (nullable = true)

 

The above code works seamlessly for PostgreSQL, MySQL databases, but it starts causing problems as soon as I use Hive related JDBC config.

 

First, my read was not able to read any data and returning empty results. After some search, I could able to make read work by adding the custom HiveDialect but still, I'm facing issue in writing data to Hive.

 

case object HiveDialect extends JdbcDialect {
  override def canHandle(url: String): Boolean = url.startsWith("jdbc:hive2")
  override def quoteIdentifier(colName: String): String = s"`$colName`"
  override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
    case StringType => Option(JdbcType("STRING", Types.VARCHAR))
    case _ => None
  }
}

JdbcDialects.registerDialect(HiveDialect)

 

Error in Write using Cloudera Hive JDBC Driver:

 

19/11/14 11:02:05 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
java.sql.SQLException: [Cloudera][HiveJDBCDriver](500051) ERROR processing query/statement. Error Code: 0, SQL state: org.apache.spark.sql.catalyst.parser.ParseException:
mismatched input '`id`' expecting {'(', 'SELECT', 'FROM', 'VALUES', 'TABLE', 'INSERT', 'MAP', 'REDUCE'}(line 1, pos 25)
== SQL ==
INSERT INTO empdata_bkp (`id`,`name`,`dept`) VALUES (123,'siva','CS') ,(345,'ganesh','CS') ,(234,'mani','IT')
-------------------------^^^
, Query: INSERT INTO empdata_bkp (`id`,`name`,`dept`) VALUES (123,'siva','CS') ,(345,'ganesh','CS') ,(234,'mani','IT').
at com.cloudera.hiveserver2.hivecommon.api.HS2Client.pollForOperationCompletion(Unknown Source)
at com.cloudera.hiveserver2.hivecommon.api.HS2Client.executeStatementInternal(Unknown Source)
at com.cloudera.hiveserver2.hivecommon.api.HS2Client.executeStatement(Unknown Source)
at com.cloudera.hiveserver2.hivecommon.dataengine.HiveJDBCNativeQueryExecutor.executeHelper(Unknown Source)
at com.cloudera.hiveserver2.hivecommon.dataengine.HiveJDBCNativeQueryExecutor.executeHelperParameter(Unknown Source)
at com.cloudera.hiveserver2.hivecommon.dataengine.HiveJDBCNativeQueryExecutor.execute(Unknown Source)
at com.cloudera.hiveserver2.jdbc.common.SPreparedStatement.executePreparedAnyBatch(Unknown Source)
at com.cloudera.hiveserver2.jdbc.common.SPreparedStatement.executeBatch(Unknown Source)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:672)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:834)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:834)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

 

Error in Write using Hive JDBC Driver:

 

19/11/13 10:30:14 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
java.sql.SQLException: Method not supported
    at org.apache.hive.jdbc.HivePreparedStatement.addBatch(HivePreparedStatement.java:75)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:664)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:834)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:834)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)

 

How I can execute Hive Queries(Read/Write) from Spark to Multiple Remote Hive Servers using Spark JDBC?

 

I can't use the Hive metastore URI approach since, in that case, I will be restricting myself with single Hive metastore config. Also as I mentioned earlier I want the approach to be generic for all the database types (PostgreSQL, MySQL, Hive) so taking the Hive metastore URI approach won't work in my case. Kindly suggest.

 

Dependency Details:

  • Scala version: 2.11
  • Spark version: 2.4.3
  • Hive version: 2.1.1.
  • Hive JDBC Driver Used: 2.0.1
  • Cloudera Hive JDBC Driver Used: 4-2.6.5.1007
1 REPLY 1
Highlighted

Re: Spark JDBC to Read and Write from and to Hive

Moderator

Hello @gtmanoj23051988 ,

 

thank you for the detailed issue description. It seems to be similar that was described in this thread, hence just summarising here the solution for your convenience:

 

Impala/Hive Driver tries to transform queries that were already in the Impala/Hive native form when UseNativeQuery is set to zero or not set at all causing the syntax error.

 

You can overcome this exception by adding UseNativeQuery=1 in the JDBC query string parameter.

 

Kind regards:

Ferenc

 

 

 

 


Ferenc Erdelyi, Technical Solutions Manager

Was your question answered? Make sure to mark the answer as the accepted solution.
If you find a reply useful, say thanks by clicking on the thumbs up button.

Learn more about the Cloudera Community:

Don't have an account?
Coming from Hortonworks? Activate your account here