Member since
06-06-2018
1
Post
0
Kudos Received
0
Solutions
06-06-2018
04:56 AM
Version Details: ---------------- Cloudera - CDH 5.14 Apache Impala - impala-2.11.0+cdh5.14.0+0 Apache Kudu - kudu-1.6.0+cdh5.14.0+0 Apache Spark - spark-1.6.0+cdh5.14.0+537
Problem trying to solve: ------------------------ We are having a change records (delta records) in HDFS Hive table. Trying to upsert these change records table which have updates and inserts into a Kudu table using PySpark 1.6.
Steps executed: --------------- Case 1: csv_df = hive_context.read.format("com.databricks.spark.csv").load("%s" % csv_file_path) csv_df.write.format('org.apache.kudu.spark.kudu').option('kudu.master', kudu_master).option('kudu.table',kudu_table_name).mode("append").save()
Case 2: In the second case we tried to create a temptable from the dataframe and tried to insert the same into the kudu table. csv_df.registerTempTable('makt_kudu_temp') sqlContext.sql("""insert into <makt_db_name>.<makt_table_name> select * from makt_kudu_temp""")
In both the cases we got the below error. Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/hadoop1/cloudera/parcels/CDH-5.14.0-1.cdh5.14.0.p0.24/lib/spark/python/pyspark/sql/context.py", line 580, in sql return DataFrame(self._ssql_ctx.sql(sqlQuery), self) File "/hadoop1/cloudera/parcels/CDH-5.14.0-1.cdh5.14.0.p0.24/lib/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in __call__ File "/hadoop1/cloudera/parcels/CDH-5.14.0-1.cdh5.14.0.p0.24/lib/spark/python/pyspark/sql/utils.py", line 45, in deco return f(*a, **kw) File "/hadoop1/cloudera/parcels/CDH-5.14.0-1.cdh5.14.0.p0.24/lib/spark/python/lib/py4j-0.9-src.zip/py4j/protocol.py", line 308, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o48.sql. : java.lang.RuntimeException: java.lang.ClassNotFoundException: org.apache.kudu.mapreduce.KuduTableInputFormat at org.apache.hadoop.hive.ql.metadata.Table.getInputFormatClass(Table.java:308) at org.apache.spark.sql.hive.client.ClientWrapper$$anonfun$getTableOption$1$$anonfun$3.apply(ClientWrapper.scala:358) at org.apache.spark.sql.hive.client.ClientWrapper$$anonfun$getTableOption$1$$anonfun$3.apply(ClientWrapper.scala:341) at scala.Option.map(Option.scala:145)
2. We are getting the error with "Upsert" also when we tried to perform the operation.(Is upsert do not run by sqlContext).
At last, We used CASTING also in each and every column of the table but that also did not help.
... View more
Labels:
- Labels:
-
Apache Hive
-
Apache Kudu
-
Apache Spark
-
HDFS