New Contributor
Posts: 1
Registered: ‎06-06-2018

Error while inserting data into Kudu table from HIve table using Pyspark

[ Edited ]

Version Details:
Cloudera - CDH 5.14
Apache Impala -              impala-2.11.0+cdh5.14.0+0
Apache Kudu - kudu-1.6.0+cdh5.14.0+0
Apache Spark - spark-1.6.0+cdh5.14.0+537

Problem trying to solve:
We are having a change records (delta records) in HDFS Hive table.
Trying to upsert these change records table which have updates and inserts into a Kudu table using PySpark 1.6.

Steps executed:
Case 1:
csv_df ="com.databricks.spark.csv").load("%s" % csv_file_path)
csv_df.write.format('org.apache.kudu.spark.kudu').option('kudu.master', kudu_master).option('kudu.table',kudu_table_name).mode("append").save()

Case 2:
In the second case we tried to create a temptable from the dataframe and tried to insert the same into the kudu table.
sqlContext.sql("""insert into <makt_db_name>.<makt_table_name> select * from makt_kudu_temp""")

In both the cases we got the below error.
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/hadoop1/cloudera/parcels/CDH-5.14.0-1.cdh5.14.0.p0.24/lib/spark/python/pyspark/sql/", line 580, in sql
    return DataFrame(self._ssql_ctx.sql(sqlQuery), self)
  File "/hadoop1/cloudera/parcels/CDH-5.14.0-1.cdh5.14.0.p0.24/lib/spark/python/lib/", line 813, in __call__
  File "/hadoop1/cloudera/parcels/CDH-5.14.0-1.cdh5.14.0.p0.24/lib/spark/python/pyspark/sql/", line 45, in deco
    return f(*a, **kw)
  File "/hadoop1/cloudera/parcels/CDH-5.14.0-1.cdh5.14.0.p0.24/lib/spark/python/lib/", line 308, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o48.sql.
: java.lang.RuntimeException: java.lang.ClassNotFoundException: org.apache.kudu.mapreduce.KuduTableInputFormat
        at org.apache.hadoop.hive.ql.metadata.Table.getInputFormatClass(
        at org.apache.spark.sql.hive.client.ClientWrapper$$anonfun$getTableOption$1$$anonfun$3.apply(ClientWrapper.scala:358)
        at org.apache.spark.sql.hive.client.ClientWrapper$$anonfun$getTableOption$1$$anonfun$3.apply(ClientWrapper.scala:341)

2. We are getting the error with "Upsert" also when we tried to perform the operation.(Is upsert do not run by sqlContext).

At last, We used CASTING also in each and every column of the table but that also did not help.

Cloudera Employee
Posts: 28
Registered: ‎02-23-2017

Re: Error while inserting data into Kudu table from HIve table using Pyspark

AFAIK there isn't any official guidance for using PySpark with Kudu. That said, have you taken a look at this repo that has some info on using Kudu with PySpark?