Community Articles

Find and share helpful community-sourced technical articles.
avatar
Master Collaborator

Spark3 Kudu Integration

 

image.png

This blog post will guide you through the process of integrating Spark 3 with Kudu, providing you with valuable insights and step-by-step instructions.

Apache Kudu

Kudu is a distributed columnar storage engine optimized for OLAP workloads. Kudu runs on commodity hardware, is horizontally scalable, and supports highly available operations.

Kudu Integration with Spark

Apache Kudu can be integrated with Apache Spark using the built-in Spark-SQL-Kudu library. This integration allows us to read from and write to Kudu tables directly from Spark applications using Spark SQL.

Kudu integrates with Spark through the Data Source API as of version 1.0.0. To integrate Kudu with Spark3, we need to use kudu-spark3_2.12 library. 

Below are the step-by-step instructions to seamlessly integrate Kudu with Spark3:

 

Impala/Kudu:

Step 1: Launch the impala-shell

Go to the Cloudera Manager > Impala > Status > Copy the Impala Shell Command and run the command from the shell.

For example,

 

 

impala-shell -i node1 -d default -k --ssl --ca_cert=/var/lib/cloudera-scm-agent/agent-cert/cm-auto-global_cacerts.pem

 

 

Step 2: Create the Kudu table.

 

 

CREATE TABLE employees
(
  id BIGINT,
  name STRING,
  age SMALLINT,
  SALARY FLOAT,
  PRIMARY KEY(id)
)
PARTITION BY HASH PARTITIONS 16
STORED AS KUDU;

 

 

Step 3: Insert the data into the Kudu table

 

 

INSERT INTO employees VALUES (1, "Ranga", 28, 10000), (2, "Nishanth", 5, 40000), (3, "Meena", 30, 24000);

 

 

Step 4: Verify the data from the Kudu table

 

 

SELECT * FROM employees;

 

 

Step 5: Quit the Impala shell

 

 

quit;

 

 

Spark:

Pre-requisites:

  1. Verify the user has proper permission(s) to access the table/database to avoid the following exception:
    org.apache.kudu.client.NonRecoverableException: Unauthorized action
      at org.apache.kudu.client.KuduException.transformException(KuduException.java:110)
      at org.apache.kudu.client.KuduClient.joinAndHandleException(KuduClient.java:470)
      at org.apache.kudu.client.KuduClient.openTable(KuduClient.java:288)
      at org.apache.kudu.spark.kudu.KuduRelation.<init>(DefaultSource.scala:327)
      at org.apache.kudu.spark.kudu.DefaultSource.createRelation(DefaultSource.scala:132)
      at org.apache.kudu.spark.kudu.DefaultSource.createRelation(DefaultSource.scala:97)
      at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:350)
      at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:228)
      at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:210)
      at scala.Option.getOrElse(Option.scala:189)
      at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:210)
      at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:171)
  2. To provide right permission to the user go to Ranger UIAccess Manager > KUDU and click on cm_kudu > Use existing policy or create a new policy and provide required permission
  3. You need to provide the exact number of kudu master values else similar f exception(s) will occur.
  4. To collect the kudu master values, go to the Cloudera Manager > Kudu > Instances > Copy the Hostname whose Role Type is Master.
    23/05/15 12:59:38 WARN  client.ConnectToCluster: [kudu-nio-0]: Could not connect to a leader master. Client configured with 1 master(s) (node1:7051) but cluster indicates it expects 2 master(s) (node1:7051,node2:7051)
    java.security.PrivilegedActionException: org.apache.kudu.client.NonRecoverableException: Could not connect to a leader master. Client configured with 1 master(s) (node1) but cluster indicates it expects 2 master(s) (node1:7051,node2:7051)
      at java.security.AccessController.doPrivileged(Native Method)
      at javax.security.auth.Subject.doAs(Subject.java:360)
      at org.apache.kudu.spark.kudu.KuduContext.<init>(KuduContext.scala:169)
      at org.apache.kudu.spark.kudu.KuduRelation.<init>(DefaultSource.scala:325)
      at org.apache.kudu.spark.kudu.DefaultSource.createRelation(DefaultSource.scala:132)
      at org.apache.kudu.spark.kudu.DefaultSource.createRelation(DefaultSource.scala:97)
      at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:350)
      at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:228)
      at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:210)
      at scala.Option.getOrElse(Option.scala:189)
      at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:210)
      at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:171)
      ... 47 elided

 

 

Step 6: Launch the spark3-shell by passing kudu-spark3 jar.

There are two ways to pass the kudu-spark3 connector jar to spark3-shell: 

  1. Using Jars:
    spark3-shell --jars /opt/cloudera/parcels/CDH/lib/kudu/kudu-spark3_2.12.jar
  2. Using Packages:
    spark3-shell --packages org.apache.kudu:kudu-spark3_2.12:<kudu-cdp-version> --repositories https://repository.cloudera.com/artifactory/cloudera-repos/

 

 

You can use any of the above options.

For <kudu-cdp-version>, check Cloudera Runtime component versions in Release Notes.

 

Step 7: Run the spark code to retrieve data from kudu.

 

Note: Before running the following code, you must replace the kudu.master value collected from prerequisite. 

 

 

val kudu_table = "default.employees"
val kudu_master = "kudu.master1:7051,kudu.master2:7051,kudu.master2:7051"

// Load the data from kudu
val df = spark.read.options(Map("kudu.master" -> kudu_master, "kudu.table" -> kudu_table)).format("kudu").load()

// Display the data
df.show()

// Create sample dataset to insert
case class Employee(id:Long, name: String, age: Short, salary: Float)

val employeeDF = Seq(
  Employee(4L, "Employee6", 56, 1500.5f),
  Employee(5L, "Employee7", 30, 15000.5f)
).toDF()

// Save the data to kudu
employeeDF.write.options(Map("kudu.master"-> kudu_master, "kudu.table"-> kudu_table)).mode("append").format("kudu").save()

 

 

Note: After running the above code if you get any error like table not found, then you need to append impala:: to the kudu_table. For example val kudu_table = "impala::default.employees"

 

PySpark Code:

filename: kudu_pyspark3_example.py

 

 

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Kudu Integration with Spark3").getOrCreate()

kudu_table = "default.employees"
kudu_master = "kudu.master1:7051,kudu.master2:7051,kudu.master2:7051"
df=spark.read.option('kudu.master', kudu_master).option('kudu.table', kudu_table).format("kudu").load()
df.show()

spark.stop()

 

 

Using Spark with a Secure Kudu Cluster

The Kudu Spark integration is able to operate on secure Kudu clusters which have authentication and encryption enabled, but the submitter of the Spark job must provide the proper credentials.

Client mode:

To submit the Spark application in client mode, the submitting user must have an active Kerberos ticket granted through kinit.

 

 

kinit
spark3-submit \
--master yarn \
--deploy-mode client \
--jars /opt/cloudera/parcels/CDH/lib/kudu/kudu-spark3_2.12.jar \
kudu_pyspark3_example.py

 

 

 

Note: In client mode, the user needs to be authenticated using kinit only. If you specify keytab and principal, it will throw the following exception.

 

 

java.security.PrivilegedActionException: org.apache.kudu.client.NonRecoverableException: Couldn't find a valid master in (node1:7051). Exceptions received: [org.apache.kudu.client.NonRecoverableException: server requires authentication, but client does not have Kerberos credentials (tgt). Authentication tokens were not used because no token is available]

 

 

 

Cluster mode:

To submit the Spark application in cluster mode, the Kerberos principal name and keytab location must be provided through the --principal and --keytab arguments to spark3-submit.

 

 

spark3-submit \
  --master yarn \
  --deploy-mode cluster \
  --jars /opt/cloudera/parcels/CDH/lib/kudu/kudu-spark3_2.12.jar \
  --principal user/node1.hadoop.com@HADOOP.COM \
  --keytab user.keytab \
  kudu_pyspark3_example.py

 

 

 

Note: In cluster mode, the user needs to be authenticated using keytab and principal only. If we do authentication using kinit, it will throw the following exception.

 

 

java.security.PrivilegedActionException: org.apache.kudu.client.NonRecoverableException: Couldn't find a valid master in (node1:7051). Exceptions received: [org.apache.kudu.client.NonRecoverableException: server requires authentication, but client does not have Kerberos credentials (tgt). Authentication tokens were not used because no token is available]

 

 

Resources

1. Developing Applications With Apache Kudu

2. Kudu integration with Spark

 

 

2,948 Views