Support Questions

Find answers, ask questions, and share your expertise

Error while inserting data into Kudu table from HIve table using Pyspark

avatar
New Contributor

Version Details:
----------------
Cloudera - CDH 5.14
Apache Impala -              impala-2.11.0+cdh5.14.0+0
Apache Kudu - kudu-1.6.0+cdh5.14.0+0
Apache Spark - spark-1.6.0+cdh5.14.0+537

Problem trying to solve:
------------------------
We are having a change records (delta records) in HDFS Hive table.
Trying to upsert these change records table which have updates and inserts into a Kudu table using PySpark 1.6.

Steps executed:
---------------
Case 1:
csv_df = hive_context.read.format("com.databricks.spark.csv").load("%s" % csv_file_path)
csv_df.write.format('org.apache.kudu.spark.kudu').option('kudu.master', kudu_master).option('kudu.table',kudu_table_name).mode("append").save()

Case 2:
In the second case we tried to create a temptable from the dataframe and tried to insert the same into the kudu table.
csv_df.registerTempTable('makt_kudu_temp')
sqlContext.sql("""insert into <makt_db_name>.<makt_table_name> select * from makt_kudu_temp""")


In both the cases we got the below error.
  
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/hadoop1/cloudera/parcels/CDH-5.14.0-1.cdh5.14.0.p0.24/lib/spark/python/pyspark/sql/context.py", line 580, in sql
    return DataFrame(self._ssql_ctx.sql(sqlQuery), self)
  File "/hadoop1/cloudera/parcels/CDH-5.14.0-1.cdh5.14.0.p0.24/lib/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in __call__
  File "/hadoop1/cloudera/parcels/CDH-5.14.0-1.cdh5.14.0.p0.24/lib/spark/python/pyspark/sql/utils.py", line 45, in deco
    return f(*a, **kw)
  File "/hadoop1/cloudera/parcels/CDH-5.14.0-1.cdh5.14.0.p0.24/lib/spark/python/lib/py4j-0.9-src.zip/py4j/protocol.py", line 308, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o48.sql.
: java.lang.RuntimeException: java.lang.ClassNotFoundException: org.apache.kudu.mapreduce.KuduTableInputFormat
        at org.apache.hadoop.hive.ql.metadata.Table.getInputFormatClass(Table.java:308)
        at org.apache.spark.sql.hive.client.ClientWrapper$$anonfun$getTableOption$1$$anonfun$3.apply(ClientWrapper.scala:358)
        at org.apache.spark.sql.hive.client.ClientWrapper$$anonfun$getTableOption$1$$anonfun$3.apply(ClientWrapper.scala:341)
        at scala.Option.map(Option.scala:145)
  

2. We are getting the error with "Upsert" also when we tried to perform the operation.(Is upsert do not run by sqlContext).


At last, We used CASTING also in each and every column of the table but that also did not help.

4 REPLIES 4

avatar
Rising Star

AFAIK there isn't any official guidance for using PySpark with Kudu. That said, have you taken a look at this repo that has some info on using Kudu with PySpark?

avatar
New Contributor

Hi,

 

Did you able to resolve the issue..?  If yes, how did you do..i am also having the same issue..?

 

Thanks,

avatar
New Contributor

Use spark context instead of hive.

Apache Spark 2.3 

Below is the code for your reference:

-------------------------------------

Read kudu table from pyspark with below code:

 

kuduDF = spark.read.format('org.apache.kudu.spark.kudu').option('kudu.master',"IP of master").option('kudu.table',"impala::TABLE: name").load()

kuduDF.show(5)

 

Write to kudu table with below code:

 

DF.write.format('org.apache.kudu.spark.kudu').option('kudu.master',"IP of master").option('kudu.table',"impala::TABLE: name").mode("append").save()

----------------------------------------

Reference link: https://medium.com/@sciencecommitter/how-to-read-from-and-write-to-kudu-tables-in-pyspark-via-impala...

 

If in case you want to use Scala below is the reference link:

https://kudu.apache.org/docs/developing.html

avatar
New Contributor

Please add kudu-spark2-tools_2.11-1.6.0.jar depending on your spark and scala version.

Use spark context instead of hive.

Apache Spark 2.3 

Below is the code for your reference:

-------------------------------------

Read kudu table from pyspark with below code:

 

kuduDF = spark.read.format('org.apache.kudu.spark.kudu').option('kudu.master',"IP of master").option('kudu.table',"impala::TABLE: name").load()

kuduDF.show(5)

 

Write to kudu table with below code:

 

DF.write.format('org.apache.kudu.spark.kudu').option('kudu.master',"IP of master").option('kudu.table',"impala::TABLE: name").mode("append").save()

----------------------------------------

Reference link: https://medium.com/@sciencecommitter/how-to-read-from-and-write-to-kudu-tables-in-pyspark-via-impala...

 

If in case you want to use Scala below is the reference link:

https://kudu.apache.org/docs/developing.html