- Subscribe to RSS Feed
- Mark as New
- Mark as Read
- Bookmark
- Subscribe
- Printer Friendly Page
- Report Inappropriate Content
Created on 07-11-2023 11:03 PM
Spark3 Kudu Integration
This blog post will guide you through the process of integrating Spark 3 with Kudu, providing you with valuable insights and step-by-step instructions.
Apache Kudu
Kudu is a distributed columnar storage engine optimized for OLAP workloads. Kudu runs on commodity hardware, is horizontally scalable, and supports highly available operations.
Kudu Integration with Spark
Apache Kudu can be integrated with Apache Spark using the built-in Spark-SQL-Kudu library. This integration allows us to read from and write to Kudu tables directly from Spark applications using Spark SQL.
Kudu integrates with Spark through the Data Source API as of version 1.0.0. To integrate Kudu with Spark3, we need to use kudu-spark3_2.12 library.
Below are the step-by-step instructions to seamlessly integrate Kudu with Spark3:
Impala/Kudu:
Step 1: Launch the impala-shell
Go to the Cloudera Manager > Impala > Status > Copy the Impala Shell Command and run the command from the shell.
For example,
impala-shell -i node1 -d default -k --ssl --ca_cert=/var/lib/cloudera-scm-agent/agent-cert/cm-auto-global_cacerts.pem
Step 2: Create the Kudu table.
CREATE TABLE employees
(
id BIGINT,
name STRING,
age SMALLINT,
SALARY FLOAT,
PRIMARY KEY(id)
)
PARTITION BY HASH PARTITIONS 16
STORED AS KUDU;
Step 3: Insert the data into the Kudu table
INSERT INTO employees VALUES (1, "Ranga", 28, 10000), (2, "Nishanth", 5, 40000), (3, "Meena", 30, 24000);
Step 4: Verify the data from the Kudu table
SELECT * FROM employees;
Step 5: Quit the Impala shell
quit;
Spark:
Pre-requisites:
- Verify the user has proper permission(s) to access the table/database to avoid the following exception:
org.apache.kudu.client.NonRecoverableException: Unauthorized action at org.apache.kudu.client.KuduException.transformException(KuduException.java:110) at org.apache.kudu.client.KuduClient.joinAndHandleException(KuduClient.java:470) at org.apache.kudu.client.KuduClient.openTable(KuduClient.java:288) at org.apache.kudu.spark.kudu.KuduRelation.<init>(DefaultSource.scala:327) at org.apache.kudu.spark.kudu.DefaultSource.createRelation(DefaultSource.scala:132) at org.apache.kudu.spark.kudu.DefaultSource.createRelation(DefaultSource.scala:97) at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:350) at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:228) at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:210) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:210) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:171)
- To provide right permission to the user go to Ranger UI > Access Manager > KUDU and click on cm_kudu > Use existing policy or create a new policy and provide required permission.
- You need to provide the exact number of kudu master values else similar f exception(s) will occur.
- To collect the kudu master values, go to the Cloudera Manager > Kudu > Instances > Copy the Hostname whose Role Type is Master.
23/05/15 12:59:38 WARN client.ConnectToCluster: [kudu-nio-0]: Could not connect to a leader master. Client configured with 1 master(s) (node1:7051) but cluster indicates it expects 2 master(s) (node1:7051,node2:7051) java.security.PrivilegedActionException: org.apache.kudu.client.NonRecoverableException: Could not connect to a leader master. Client configured with 1 master(s) (node1) but cluster indicates it expects 2 master(s) (node1:7051,node2:7051) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:360) at org.apache.kudu.spark.kudu.KuduContext.<init>(KuduContext.scala:169) at org.apache.kudu.spark.kudu.KuduRelation.<init>(DefaultSource.scala:325) at org.apache.kudu.spark.kudu.DefaultSource.createRelation(DefaultSource.scala:132) at org.apache.kudu.spark.kudu.DefaultSource.createRelation(DefaultSource.scala:97) at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:350) at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:228) at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:210) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:210) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:171) ... 47 elided
Step 6: Launch the spark3-shell by passing kudu-spark3 jar.
There are two ways to pass the kudu-spark3 connector jar to spark3-shell:
- Using Jars:
spark3-shell --jars /opt/cloudera/parcels/CDH/lib/kudu/kudu-spark3_2.12.jar
- Using Packages:
spark3-shell --packages org.apache.kudu:kudu-spark3_2.12:<kudu-cdp-version> --repositories https://repository.cloudera.com/artifactory/cloudera-repos/
You can use any of the above options.
For <kudu-cdp-version>, check Cloudera Runtime component versions in Release Notes.
Step 7: Run the spark code to retrieve data from kudu.
Note: Before running the following code, you must replace the kudu.master value collected from prerequisite.
val kudu_table = "default.employees"
val kudu_master = "kudu.master1:7051,kudu.master2:7051,kudu.master2:7051"
// Load the data from kudu
val df = spark.read.options(Map("kudu.master" -> kudu_master, "kudu.table" -> kudu_table)).format("kudu").load()
// Display the data
df.show()
// Create sample dataset to insert
case class Employee(id:Long, name: String, age: Short, salary: Float)
val employeeDF = Seq(
Employee(4L, "Employee6", 56, 1500.5f),
Employee(5L, "Employee7", 30, 15000.5f)
).toDF()
// Save the data to kudu
employeeDF.write.options(Map("kudu.master"-> kudu_master, "kudu.table"-> kudu_table)).mode("append").format("kudu").save()
Note: After running the above code if you get any error like table not found, then you need to append impala:: to the kudu_table. For example val kudu_table = "impala::default.employees"
PySpark Code:
filename: kudu_pyspark3_example.py
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Kudu Integration with Spark3").getOrCreate()
kudu_table = "default.employees"
kudu_master = "kudu.master1:7051,kudu.master2:7051,kudu.master2:7051"
df=spark.read.option('kudu.master', kudu_master).option('kudu.table', kudu_table).format("kudu").load()
df.show()
spark.stop()
Using Spark with a Secure Kudu Cluster
The Kudu Spark integration is able to operate on secure Kudu clusters which have authentication and encryption enabled, but the submitter of the Spark job must provide the proper credentials.
Client mode:
To submit the Spark application in client mode, the submitting user must have an active Kerberos ticket granted through kinit.
kinit
spark3-submit \
--master yarn \
--deploy-mode client \
--jars /opt/cloudera/parcels/CDH/lib/kudu/kudu-spark3_2.12.jar \
kudu_pyspark3_example.py
Note: In client mode, the user needs to be authenticated using kinit only. If you specify keytab and principal, it will throw the following exception.
java.security.PrivilegedActionException: org.apache.kudu.client.NonRecoverableException: Couldn't find a valid master in (node1:7051). Exceptions received: [org.apache.kudu.client.NonRecoverableException: server requires authentication, but client does not have Kerberos credentials (tgt). Authentication tokens were not used because no token is available]
Cluster mode:
To submit the Spark application in cluster mode, the Kerberos principal name and keytab location must be provided through the --principal and --keytab arguments to spark3-submit.
spark3-submit \
--master yarn \
--deploy-mode cluster \
--jars /opt/cloudera/parcels/CDH/lib/kudu/kudu-spark3_2.12.jar \
--principal user/node1.hadoop.com@HADOOP.COM \
--keytab user.keytab \
kudu_pyspark3_example.py
Note: In cluster mode, the user needs to be authenticated using keytab and principal only. If we do authentication using kinit, it will throw the following exception.
java.security.PrivilegedActionException: org.apache.kudu.client.NonRecoverableException: Couldn't find a valid master in (node1:7051). Exceptions received: [org.apache.kudu.client.NonRecoverableException: server requires authentication, but client does not have Kerberos credentials (tgt). Authentication tokens were not used because no token is available]
Resources
1. Developing Applications With Apache Kudu
2. Kudu integration with Spark