Member since
02-12-2020
2
Posts
0
Kudos Received
0
Solutions
10-03-2021
04:20 PM
1 Kudo
Accessing Kudu tables from Spark on the Cloudera Data Platform (CDP) is a common integration pattern for real-time analytic workloads that require fast inserts and updates while at the same time enabling efficient columnar scans across a single storage layer.
This article describes Kudu integration for Spark jobs running in the Cloudera Data Engineering (CDE) Service by using the Kudu storage defined as part of a CDP Datahub cluster.
Kudu Cluster
We'll first configure the Kudu cluster. This can be done by creating a Datahub cluster of type Real-time Data Mart:
Once the cluster has been provisioned successfully, you may need to add your IP address to the firewall rules for the cluster hosts in order to access some of the cluster's interfaces. For example, with Amazon Web Services (AWS), a link to the EC2 instance configuration page is provided on the Datahub cluster page (on the EC2 instance page, locate the Security configuration and edit Security Group > Inbound Rules) :
Kudu Table
Next, we'll set up a small Kudu table for testing. An easy way to do this is from the Hue Web UI (a link is provided to this under the Datahub cluster Services section, shown above). Once logged into the Hue Impala Editor, run the following SQL to create the table and insert 1 record:
CREATE TABLE IF NOT EXISTS default.cde_kudu_table(
k BIGINT,
a STRING,
b BIGINT,
PRIMARY KEY(k)
)
PARTITION BY HASH (k)
PARTITIONS 2
STORED AS KUDU;
INSERT INTO default.cde_kudu_table VALUES (1, 'aaa', 111);
CDE Spark Job
Next, we'll interact with the Kudu table that was just created from a CDE Spark job.
Copy the three Datahub (Kudu) cluster master node hostnames (FQDNs) under the Hardware tab, as shown in the example below:
Next, edit the PySpark kudu_master variable in the sample code below by replacing the <<hostnames>> with the master node FQDNs noted in the previous step, and then save the file as cde_kudu.py:
from pyspark.sql import SparkSession
spark = SparkSession.builder.enableHiveSupport().getOrCreate()
kudu_master="\
<<datahub-master0-hostname>>:7051,\
<<datahub-master1-hostname>>:7051,\
<<datahub-master2-hostname>>:7051"
df_in = spark.createDataFrame([('abc', 123, 1), ('xyz', 789, 10), ('zzz', 10000, 99)], ('a', 'b', 'k'))
df_in.write.format('org.apache.kudu.spark.kudu')\
.option('kudu.master', kudu_master)\
.option('kudu.table', 'impala::default.cde_kudu_table')\
.mode('append')\
.option("kudu.operation", "upsert")\
.save()
df_out = spark.read.format('org.apache.kudu.spark.kudu')\
.option('kudu.master', kudu_master)\
.option('kudu.table', 'impala::default.cde_kudu_table')\
.load()
print(df_out.show())
spark.stop()
Finally, run the sample code using the CDE CLI (adjust the Kudu package versions, if required):
cde spark submit cde_kudu.py --packages org.apache.kudu:kudu-spark2_2.11:1.13.0
The job will update values for row (k=1), insert rows for (k=10, k=99), and then read the resulting table back as a dataframe (output to the job's driver process stdout log):
+---+---+-----+
| k| a| b|
+---+---+-----+
| 1|abc| 1243|
| 10|xyz| 789|
| 99|zzz|10000|
+---+---+-----+
... View more
09-03-2021
09:58 AM
1 Kudo
A common request from CDE users is the ability to specify a timeout (or SLA) for their Spark job. This is easily configured by leveraging CDE's embedded Airflow sub-service, which provides a rich set of workflow management and scheduling features, along with Cloudera Data Platform (CDP-specific) operators such as CDEJobRunOperator and CDWOperator.
As a simple example, the steps below create a CDE Spark job, which is then triggered from a CDE Airflow Task, also making use of the execution_timeout Airflow Task parameter to specify an SLA after which the Spark job should be killed.
We'll use the pi.py sample code as our CDE Spark job, and the following Airflow DAG definition (airflow_dag_with_task_timeout.py)
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from cloudera.cdp.airflow.operators.cde_operator import CDEJobRunOperator
default_args = {
'owner': 'cdeuser1',
'retry_delay': timedelta(seconds=5),
'start_date': datetime(2021,1,1,1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 0
}
dag = DAG(
'run_pi_py_dag',
default_args=default_args,
catchup=False,
is_paused_upon_creation=False
)
start = DummyOperator(task_id='start', dag=dag)
end = DummyOperator(task_id='end', dag=dag)
pi_py = CDEJobRunOperator(
task_id='pi_py_task',
dag=dag,
job_name='pi-py',
execution_timeout=timedelta(seconds=120)
)
start >> pi_py >> end
In particular, the execution_timeout Airflow Task parameter will result in our CDE job being killed should it not be complete within the specified SLA time (120 seconds). Airflow provides a wide range of other Task parameters.
Next, we will define a CDE Resource containing both the Airflow DAG definition and Spark application file:
cde resource create --name airflow-timeout-example
cde resource upload --name airflow-timeout-example --local-path ./airflow_dag_with_task_timeout.py --local-path ./pi.py
Configure the CDE Spark job:
cde job create --name pi-py --mount-1-resource airflow-timeout-example --application-file pi.py --type spark
Configure the CDE Airflow Job (which calls the Spark job):
cde job create --name airflow-timeout-test --mount-1-resource airflow-timeout-example --dag-file airflow_dag_with_task_timeout.py --type airflow
Finally, we can test the CDE Airflow Job knowing the CDE job (Task) will not run longer than the 2-minute timeout that we specified:
cde job run --name airflow-timeout-test
... View more
Labels: