Community Articles

Find and share helpful community-sourced technical articles.
avatar
Cloudera Employee

A common request from CDE users is the ability to specify a timeout (or SLA) for their Spark job. This is easily configured by leveraging CDE's embedded Airflow sub-service, which provides a rich set of workflow management and scheduling features, along with Cloudera Data Platform (CDP-specific) operators such as CDEJobRunOperator and CDWOperator.

 

As a simple example, the steps below create a CDE Spark job, which is then triggered from a CDE Airflow Task, also making use of the execution_timeout Airflow Task parameter to specify an SLA after which the Spark job should be killed.

We'll use the pi.py sample code as our CDE Spark job, and the following Airflow DAG definition (airflow_dag_with_task_timeout.py)

 

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from cloudera.cdp.airflow.operators.cde_operator import CDEJobRunOperator

default_args = {
    'owner': 'cdeuser1',
    'retry_delay': timedelta(seconds=5),
    'start_date': datetime(2021,1,1,1),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0
}
dag = DAG(
    'run_pi_py_dag',
    default_args=default_args,
    catchup=False,
    is_paused_upon_creation=False
)
start = DummyOperator(task_id='start', dag=dag)
end = DummyOperator(task_id='end', dag=dag)

pi_py = CDEJobRunOperator(
    task_id='pi_py_task',
    dag=dag,
    job_name='pi-py',
    execution_timeout=timedelta(seconds=120)
)

start >> pi_py >> end

 

In particular, the execution_timeout Airflow Task parameter will result in our CDE job being killed should it not be complete within the specified SLA time (120 seconds). Airflow provides a wide range of other Task parameters.

Next, we will define a CDE Resource containing both the Airflow DAG definition and Spark application file:

 

cde resource create --name airflow-timeout-example
cde resource upload --name airflow-timeout-example --local-path ./airflow_dag_with_task_timeout.py --local-path ./pi.py

 

Configure the CDE Spark job:

 

cde job create --name pi-py --mount-1-resource airflow-timeout-example --application-file pi.py --type spark

 

Configure the CDE Airflow Job (which calls the Spark job):

 

cde job create --name airflow-timeout-test --mount-1-resource airflow-timeout-example --dag-file  airflow_dag_with_task_timeout.py --type airflow 

 

Finally, we can test the CDE Airflow Job knowing the CDE job (Task) will not run longer than the 2-minute timeout that we specified:

 

cde job run --name airflow-timeout-test

 

 

 

1,139 Views