Created on 07-11-2023 11:03 PM
This blog post will guide you through the process of integrating Spark 3 with Kudu, providing you with valuable insights and step-by-step instructions.
Kudu is a distributed columnar storage engine optimized for OLAP workloads. Kudu runs on commodity hardware, is horizontally scalable, and supports highly available operations.
Apache Kudu can be integrated with Apache Spark using the built-in Spark-SQL-Kudu library. This integration allows us to read from and write to Kudu tables directly from Spark applications using Spark SQL.
Kudu integrates with Spark through the Data Source API as of version 1.0.0. To integrate Kudu with Spark3, we need to use kudu-spark3_2.12 library.
Below are the step-by-step instructions to seamlessly integrate Kudu with Spark3:
Impala/Kudu:
Step 1: Launch the impala-shell
Go to the Cloudera Manager > Impala > Status > Copy the Impala Shell Command and run the command from the shell.
For example,
impala-shell -i node1 -d default -k --ssl --ca_cert=/var/lib/cloudera-scm-agent/agent-cert/cm-auto-global_cacerts.pem
Step 2: Create the Kudu table.
CREATE TABLE employees
(
id BIGINT,
name STRING,
age SMALLINT,
SALARY FLOAT,
PRIMARY KEY(id)
)
PARTITION BY HASH PARTITIONS 16
STORED AS KUDU;
Step 3: Insert the data into the Kudu table
INSERT INTO employees VALUES (1, "Ranga", 28, 10000), (2, "Nishanth", 5, 40000), (3, "Meena", 30, 24000);
Step 4: Verify the data from the Kudu table
SELECT * FROM employees;
Step 5: Quit the Impala shell
quit;
Spark:
Pre-requisites:
org.apache.kudu.client.NonRecoverableException: Unauthorized action
at org.apache.kudu.client.KuduException.transformException(KuduException.java:110)
at org.apache.kudu.client.KuduClient.joinAndHandleException(KuduClient.java:470)
at org.apache.kudu.client.KuduClient.openTable(KuduClient.java:288)
at org.apache.kudu.spark.kudu.KuduRelation.<init>(DefaultSource.scala:327)
at org.apache.kudu.spark.kudu.DefaultSource.createRelation(DefaultSource.scala:132)
at org.apache.kudu.spark.kudu.DefaultSource.createRelation(DefaultSource.scala:97)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:350)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:228)
at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:210)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:210)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:171)
23/05/15 12:59:38 WARN client.ConnectToCluster: [kudu-nio-0]: Could not connect to a leader master. Client configured with 1 master(s) (node1:7051) but cluster indicates it expects 2 master(s) (node1:7051,node2:7051)
java.security.PrivilegedActionException: org.apache.kudu.client.NonRecoverableException: Could not connect to a leader master. Client configured with 1 master(s) (node1) but cluster indicates it expects 2 master(s) (node1:7051,node2:7051)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:360)
at org.apache.kudu.spark.kudu.KuduContext.<init>(KuduContext.scala:169)
at org.apache.kudu.spark.kudu.KuduRelation.<init>(DefaultSource.scala:325)
at org.apache.kudu.spark.kudu.DefaultSource.createRelation(DefaultSource.scala:132)
at org.apache.kudu.spark.kudu.DefaultSource.createRelation(DefaultSource.scala:97)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:350)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:228)
at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:210)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:210)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:171)
... 47 elided
Step 6: Launch the spark3-shell by passing kudu-spark3 jar.
There are two ways to pass the kudu-spark3 connector jar to spark3-shell:
spark3-shell --jars /opt/cloudera/parcels/CDH/lib/kudu/kudu-spark3_2.12.jar
spark3-shell --packages org.apache.kudu:kudu-spark3_2.12:<kudu-cdp-version> --repositories https://repository.cloudera.com/artifactory/cloudera-repos/
You can use any of the above options.
For <kudu-cdp-version>, check Cloudera Runtime component versions in Release Notes.
Step 7: Run the spark code to retrieve data from kudu.
Note: Before running the following code, you must replace the kudu.master value collected from prerequisite.
val kudu_table = "default.employees"
val kudu_master = "kudu.master1:7051,kudu.master2:7051,kudu.master2:7051"
// Load the data from kudu
val df = spark.read.options(Map("kudu.master" -> kudu_master, "kudu.table" -> kudu_table)).format("kudu").load()
// Display the data
df.show()
// Create sample dataset to insert
case class Employee(id:Long, name: String, age: Short, salary: Float)
val employeeDF = Seq(
Employee(4L, "Employee6", 56, 1500.5f),
Employee(5L, "Employee7", 30, 15000.5f)
).toDF()
// Save the data to kudu
employeeDF.write.options(Map("kudu.master"-> kudu_master, "kudu.table"-> kudu_table)).mode("append").format("kudu").save()
Note: After running the above code if you get any error like table not found, then you need to append impala:: to the kudu_table. For example val kudu_table = "impala::default.employees"
PySpark Code:
filename: kudu_pyspark3_example.py
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Kudu Integration with Spark3").getOrCreate()
kudu_table = "default.employees"
kudu_master = "kudu.master1:7051,kudu.master2:7051,kudu.master2:7051"
df=spark.read.option('kudu.master', kudu_master).option('kudu.table', kudu_table).format("kudu").load()
df.show()
spark.stop()
The Kudu Spark integration is able to operate on secure Kudu clusters which have authentication and encryption enabled, but the submitter of the Spark job must provide the proper credentials.
Client mode:
To submit the Spark application in client mode, the submitting user must have an active Kerberos ticket granted through kinit.
kinit
spark3-submit \
--master yarn \
--deploy-mode client \
--jars /opt/cloudera/parcels/CDH/lib/kudu/kudu-spark3_2.12.jar \
kudu_pyspark3_example.py
Note: In client mode, the user needs to be authenticated using kinit only. If you specify keytab and principal, it will throw the following exception.
java.security.PrivilegedActionException: org.apache.kudu.client.NonRecoverableException: Couldn't find a valid master in (node1:7051). Exceptions received: [org.apache.kudu.client.NonRecoverableException: server requires authentication, but client does not have Kerberos credentials (tgt). Authentication tokens were not used because no token is available]
Cluster mode:
To submit the Spark application in cluster mode, the Kerberos principal name and keytab location must be provided through the --principal and --keytab arguments to spark3-submit.
spark3-submit \
--master yarn \
--deploy-mode cluster \
--jars /opt/cloudera/parcels/CDH/lib/kudu/kudu-spark3_2.12.jar \
--principal user/node1.hadoop.com@HADOOP.COM \
--keytab user.keytab \
kudu_pyspark3_example.py
Note: In cluster mode, the user needs to be authenticated using keytab and principal only. If we do authentication using kinit, it will throw the following exception.
java.security.PrivilegedActionException: org.apache.kudu.client.NonRecoverableException: Couldn't find a valid master in (node1:7051). Exceptions received: [org.apache.kudu.client.NonRecoverableException: server requires authentication, but client does not have Kerberos credentials (tgt). Authentication tokens were not used because no token is available]
1. Developing Applications With Apache Kudu
2. Kudu integration with Spark