Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Spark JDBC to Read and Write from and to Hive

Highlighted

Spark JDBC to Read and Write from and to Hive

New Contributor

I'm trying to come up with a generic implementation to use Spark JDBC to support Read/Write data from/to various JDBC compliant databases like PostgreSQL, MySQL, Hive, etc.

 

My code looks something like below.

val conf = new SparkConf().setAppName("Spark Hive JDBC").setMaster("local[*]")

val sc = new SparkContext(conf)

val spark = SparkSession
  .builder()
  .appName("Spark Hive JDBC Example")
  .getOrCreate()

val jdbcDF = spark.read
  .format("jdbc")
  .option("url", "jdbc:hive2://host1:10000/default")
  .option("dbtable", "student1")
  .option("user", "hive")
  .option("password", "hive")
.option("driver", "com.cloudera.hive.jdbc4.HS2Driver") //.option("driver", "org.apache.hadoop.hive.jdbc.HiveDriver") .load() jdbcDF.printSchema jdbcDF.write .format("jdbc") .option("url", "jdbc:hive2://127.0.0.1:10000/default") .option("dbtable", "student2") .option("user", "hive") .option("password", "hive")
.option("driver", "com.cloudera.hive.jdbc4.HS2Driver") //.option("driver", "org.apache.hadoop.hive.jdbc.HiveDriver") .mode(SaveMode.Overwrite)

 

Schema Structure:

root
    |-- name: string (nullable = true)
    |-- id: integer (nullable = true)
    |-- dept: string (nullable = true)

 

The above code works seamlessly for PostgreSQL, MySQL databases, but it starts causing problems as soon as I use Hive related JDBC config.

 

First, my read was not able to read any data and returning empty results. After some search, I could able to make read work by adding the custom HiveDialect but still, I'm facing issue in writing data to Hive.

 

case object HiveDialect extends JdbcDialect {
  override def canHandle(url: String): Boolean = url.startsWith("jdbc:hive2")
  override def quoteIdentifier(colName: String): String = s"`$colName`"
  override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
    case StringType => Option(JdbcType("STRING", Types.VARCHAR))
    case _ => None
  }
}

JdbcDialects.registerDialect(HiveDialect)

 

Error in Write using Cloudera Hive JDBC Driver:

 

19/11/14 11:02:05 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
java.sql.SQLException: [Cloudera][HiveJDBCDriver](500051) ERROR processing query/statement. Error Code: 0, SQL state: org.apache.spark.sql.catalyst.parser.ParseException:
mismatched input '`id`' expecting {'(', 'SELECT', 'FROM', 'VALUES', 'TABLE', 'INSERT', 'MAP', 'REDUCE'}(line 1, pos 25)
== SQL ==
INSERT INTO empdata_bkp (`id`,`name`,`dept`) VALUES (123,'siva','CS') ,(345,'ganesh','CS') ,(234,'mani','IT')
-------------------------^^^
, Query: INSERT INTO empdata_bkp (`id`,`name`,`dept`) VALUES (123,'siva','CS') ,(345,'ganesh','CS') ,(234,'mani','IT').
at com.cloudera.hiveserver2.hivecommon.api.HS2Client.pollForOperationCompletion(Unknown Source)
at com.cloudera.hiveserver2.hivecommon.api.HS2Client.executeStatementInternal(Unknown Source)
at com.cloudera.hiveserver2.hivecommon.api.HS2Client.executeStatement(Unknown Source)
at com.cloudera.hiveserver2.hivecommon.dataengine.HiveJDBCNativeQueryExecutor.executeHelper(Unknown Source)
at com.cloudera.hiveserver2.hivecommon.dataengine.HiveJDBCNativeQueryExecutor.executeHelperParameter(Unknown Source)
at com.cloudera.hiveserver2.hivecommon.dataengine.HiveJDBCNativeQueryExecutor.execute(Unknown Source)
at com.cloudera.hiveserver2.jdbc.common.SPreparedStatement.executePreparedAnyBatch(Unknown Source)
at com.cloudera.hiveserver2.jdbc.common.SPreparedStatement.executeBatch(Unknown Source)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:672)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:834)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:834)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

 

Error in Write using Hive JDBC Driver:

 

19/11/13 10:30:14 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
java.sql.SQLException: Method not supported
    at org.apache.hive.jdbc.HivePreparedStatement.addBatch(HivePreparedStatement.java:75)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:664)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:834)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:834)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)

 

How I can execute Hive Queries(Read/Write) from Spark to Multiple Remote Hive Servers using Spark JDBC?

 

I can't use the Hive metastore URI approach since, in that case, I will be restricting myself with single Hive metastore config. Also as I mentioned earlier I want the approach to be generic for all the database types (PostgreSQL, MySQL, Hive) so taking the Hive metastore URI approach won't work in my case. Kindly suggest.

 

Dependency Details:

  • Scala version: 2.11
  • Spark version: 2.4.3
  • Hive version: 2.1.1.
  • Hive JDBC Driver Used: 2.0.1
  • Cloudera Hive JDBC Driver Used: 4-2.6.5.1007
Don't have an account?
Coming from Hortonworks? Activate your account here