Created on 06-06-2018 04:56 AM - edited 09-16-2022 06:18 AM
Version Details:
----------------
Cloudera - CDH 5.14
Apache Impala - impala-2.11.0+cdh5.14.0+0
Apache Kudu - kudu-1.6.0+cdh5.14.0+0
Apache Spark - spark-1.6.0+cdh5.14.0+537
Problem trying to solve:
------------------------
We are having a change records (delta records) in HDFS Hive table.
Trying to upsert these change records table which have updates and inserts into a Kudu table using PySpark 1.6.
Steps executed:
---------------
Case 1:
csv_df = hive_context.read.format("com.databricks.spark.csv").load("%s" % csv_file_path)
csv_df.write.format('org.apache.kudu.spark.kudu').option('kudu.master', kudu_master).option('kudu.table',kudu_table_name).mode("append").save()
Case 2:
In the second case we tried to create a temptable from the dataframe and tried to insert the same into the kudu table.
csv_df.registerTempTable('makt_kudu_temp')
sqlContext.sql("""insert into <makt_db_name>.<makt_table_name> select * from makt_kudu_temp""")
In both the cases we got the below error.
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/hadoop1/cloudera/parcels/CDH-5.14.0-1.cdh5.14.0.p0.24/lib/spark/python/pyspark/sql/context.py", line 580, in sql
return DataFrame(self._ssql_ctx.sql(sqlQuery), self)
File "/hadoop1/cloudera/parcels/CDH-5.14.0-1.cdh5.14.0.p0.24/lib/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in __call__
File "/hadoop1/cloudera/parcels/CDH-5.14.0-1.cdh5.14.0.p0.24/lib/spark/python/pyspark/sql/utils.py", line 45, in deco
return f(*a, **kw)
File "/hadoop1/cloudera/parcels/CDH-5.14.0-1.cdh5.14.0.p0.24/lib/spark/python/lib/py4j-0.9-src.zip/py4j/protocol.py", line 308, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o48.sql.
: java.lang.RuntimeException: java.lang.ClassNotFoundException: org.apache.kudu.mapreduce.KuduTableInputFormat
at org.apache.hadoop.hive.ql.metadata.Table.getInputFormatClass(Table.java:308)
at org.apache.spark.sql.hive.client.ClientWrapper$$anonfun$getTableOption$1$$anonfun$3.apply(ClientWrapper.scala:358)
at org.apache.spark.sql.hive.client.ClientWrapper$$anonfun$getTableOption$1$$anonfun$3.apply(ClientWrapper.scala:341)
at scala.Option.map(Option.scala:145)
2. We are getting the error with "Upsert" also when we tried to perform the operation.(Is upsert do not run by sqlContext).
At last, We used CASTING also in each and every column of the table but that also did not help.
Created 06-08-2018 07:16 PM
AFAIK there isn't any official guidance for using PySpark with Kudu. That said, have you taken a look at this repo that has some info on using Kudu with PySpark?
Created 05-24-2019 02:44 PM
Hi,
Did you able to resolve the issue..? If yes, how did you do..i am also having the same issue..?
Thanks,
Created 02-20-2020 11:45 PM
Use spark context instead of hive.
Apache Spark 2.3
Below is the code for your reference:
-------------------------------------
Read kudu table from pyspark with below code:
kuduDF = spark.read.format('org.apache.kudu.spark.kudu').option('kudu.master',"IP of master").option('kudu.table',"impala::TABLE: name").load()
kuduDF.show(5)
Write to kudu table with below code:
DF.write.format('org.apache.kudu.spark.kudu').option('kudu.master',"IP of master").option('kudu.table',"impala::TABLE: name").mode("append").save()
----------------------------------------
Reference link: https://medium.com/@sciencecommitter/how-to-read-from-and-write-to-kudu-tables-in-pyspark-via-impala...
If in case you want to use Scala below is the reference link:
Created on 02-18-2021 06:03 AM - edited 02-18-2021 06:05 AM
Please add kudu-spark2-tools_2.11-1.6.0.jar depending on your spark and scala version.
Use spark context instead of hive.
Apache Spark 2.3
Below is the code for your reference:
-------------------------------------
Read kudu table from pyspark with below code:
kuduDF = spark.read.format('org.apache.kudu.spark.kudu').option('kudu.master',"IP of master").option('kudu.table',"impala::TABLE: name").load()
kuduDF.show(5)
Write to kudu table with below code:
DF.write.format('org.apache.kudu.spark.kudu').option('kudu.master',"IP of master").option('kudu.table',"impala::TABLE: name").mode("append").save()
----------------------------------------
Reference link: https://medium.com/@sciencecommitter/how-to-read-from-and-write-to-kudu-tables-in-pyspark-via-impala...
If in case you want to use Scala below is the reference link: