Community Articles
Find and share helpful community-sourced technical articles.
Cloudera Employee

Accessing Kudu tables from Spark on the Cloudera Data Platform (CDP) is a common integration pattern for real-time analytic workloads that require fast inserts and updates while at the same time enabling efficient columnar scans across a single storage layer.

 

This article describes Kudu integration for Spark jobs running in the Cloudera Data Engineering (CDE) Service by using the Kudu storage defined as part of a CDP Datahub cluster.

Kudu Cluster

We'll first configure the Kudu cluster. This can be done by creating a Datahub cluster of type Real-time Data Mart:

Screen Shot 2021-09-29 at 9.51.28 AM.png
Once the cluster has been provisioned successfully, you may need to add your IP address to the firewall rules for the cluster hosts in order to access some of the cluster's interfaces.  For example, with Amazon Web Services (AWS), a link to the EC2 instance configuration page is provided on the Datahub cluster page (on the EC2 instance page, locate the Security configuration and edit Security Group > Inbound Rules) :
Screen Shot 2021-09-30 at 10.50.49 AM.png

Kudu Table

Next, we'll set up a small Kudu table for testing. An easy way to do this is from the Hue Web UI (a link is provided to this under the Datahub cluster Services section, shown above). Once logged into the Hue Impala Editor, run the following SQL to create the table and insert 1 record:

 

CREATE TABLE IF NOT EXISTS default.cde_kudu_table(
  k BIGINT,
  a STRING,
  b BIGINT,
  PRIMARY KEY(k)
)
PARTITION BY HASH (k)
PARTITIONS 2
STORED AS KUDU;

INSERT INTO default.cde_kudu_table VALUES (1, 'aaa', 111);

 

CDE Spark Job

Next, we'll interact with the Kudu table that was just created from a CDE Spark job.

Copy the three Datahub (Kudu) cluster master node hostnames (FQDNs) under the Hardware tab, as shown in the example below:
Screen Shot 2021-09-30 at 10.45.06 AM.png
Next, edit the PySpark kudu_master variable in the sample code below by replacing the <<hostnames>> with the master node FQDNs noted in the previous step, and then save the file as cde_kudu.py:

 

from pyspark.sql import SparkSession

spark = SparkSession.builder.enableHiveSupport().getOrCreate()

kudu_master="\
<<datahub-master0-hostname>>:7051,\
<<datahub-master1-hostname>>:7051,\
<<datahub-master2-hostname>>:7051"

df_in = spark.createDataFrame([('abc', 123, 1), ('xyz', 789, 10), ('zzz', 10000, 99)], ('a', 'b', 'k'))

df_in.write.format('org.apache.kudu.spark.kudu')\
    .option('kudu.master', kudu_master)\
    .option('kudu.table', 'impala::default.cde_kudu_table')\
    .mode('append')\
    .option("kudu.operation", "upsert")\
    .save()

df_out = spark.read.format('org.apache.kudu.spark.kudu')\
    .option('kudu.master', kudu_master)\
    .option('kudu.table', 'impala::default.cde_kudu_table')\
    .load()
print(df_out.show())

spark.stop()

 

Finally, run the sample code using the CDE CLI (adjust the Kudu package versions, if required):

cde spark submit cde_kudu.py --packages org.apache.kudu:kudu-spark2_2.11:1.13.0
The job will update values for row (k=1), insert rows for (k=10, k=99), and then read the resulting table back as a dataframe (output to the job's driver process stdout log):

 

+---+---+-----+
|  k|  a|    b|
+---+---+-----+
|  1|abc| 1243|
| 10|xyz|  789|
| 99|zzz|10000|
+---+---+-----+​

 

167 Views