Created on 09-03-202109:58 AM - edited on 09-08-202103:48 AM by subratadas
A common request from CDE users is the ability to specify a timeout (or SLA) for their Spark job. This is easily configured by leveraging CDE's embedded Airflow sub-service, which provides a rich set of workflow management and scheduling features, along with Cloudera Data Platform (CDP-specific) operators such as CDEJobRunOperator and CDWOperator.
As a simple example, the steps below create a CDE Spark job, which is then triggered from a CDE Airflow Task, also making use of the execution_timeout Airflow Task parameter to specify an SLA after which the Spark job should be killed.
We'll use the pi.py sample code as our CDE Spark job, and the following Airflow DAG definition (airflow_dag_with_task_timeout.py)
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from cloudera.cdp.airflow.operators.cde_operator import CDEJobRunOperator
default_args = {
'owner': 'cdeuser1',
'retry_delay': timedelta(seconds=5),
'start_date': datetime(2021,1,1,1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 0
}
dag = DAG(
'run_pi_py_dag',
default_args=default_args,
catchup=False,
is_paused_upon_creation=False
)
start = DummyOperator(task_id='start', dag=dag)
end = DummyOperator(task_id='end', dag=dag)
pi_py = CDEJobRunOperator(
task_id='pi_py_task',
dag=dag,
job_name='pi-py',
execution_timeout=timedelta(seconds=120)
)
start >> pi_py >> end
In particular, the execution_timeout Airflow Task parameter will result in our CDE job being killed should it not be complete within the specified SLA time (120 seconds). Airflow provides a wide range of other Task parameters.
Next, we will define a CDE Resource containing both the Airflow DAG definition and Spark application file: