Community Articles

Find and share helpful community-sourced technical articles.
avatar
Cloudera Employee
Cloudera Data Engineering (CDE) is Cloudera's new Spark as a Service offering on Public Cloud. It features kubernetes auto-scaling of Spark workers for efficient cost optimization, a simple UI interface for job management, and an integrated Airflow Scheduler for managing your production-grade workflows.
In this article, we will not focus on service deployment. For that, refer to Enabling Cloudera Data Engineering.
What we will cover is how to adjust your applications to fit CDE's paradigm, deploy them and schedule them by working through some example applications in Python and Scala.

Developing Apps for Cloudera Data Engineering

In CDE, an application is called a job. This takes the form of a main jar file or Python script. Depending on which language you wish to use, the development setup will be slightly different. We will go through those at a high level later.
For creating and managing jobs, you can interact with CDE through the UI or the CLI. We will mainly use the UI here. For instructions on setting up the CLI, refer to Using the Cloudera Data Engineering command line interface 
The CLI will be needed for CI/CD integration and also access to some of the deeper functionality within CDE, but is not needed for most day-to-day tasks. It can also be used by external applications to monitor the status of CDE jobs.

Structure of this repo

To support this article, we have developed some example applications. Refer to https://github.com/Data-drone/cde_data_ingest.The application code sits under 'src/main/python' and 'src/main/scala' respectively. Code for the airflow example DAG sits under 'src/main/airflow'.

Writing Apps for CDE - Common Concepts

With CDE, the `SparkSession` setting for `executor` and `driver` should be set through CDE options in UI or CLI rather than in the application itself. Including them in the code as well can result in abnormal behavior.

 

 

######## DON'T
### Standalone Spark / CDH Spark

spark = SparkSession \
    .builder \
    .appName("Load Data") \
    .config("spark.executor.cores", "6") \
    .config("spark.executor.memory", "10g") \
    .config("spark.num.executors", "2") \
    .enableHiveSupport() \
    .getOrCreate()

######## DO
### CDE Spark

spark = SparkSession \
    .builder \
    .appName("Load Data") \
    .enableHiveSupport() \
    .getOrCreate()

 

 

With the CDE CLI, this comes as part of the cde job create command.

 

 

# cde cli job for the above settings
cde job create --application-file <path-to-file> --name <job_name> --num-executors 2 --executor-cores 6 --executor-memory "10g" --type spark

## Note Jobname is a CDE property and doesn't correspond to appName

 

 

For configuration flags, they should be added under the Configurations section of the UI.
configurations.png
or the '--conf' option in the 'cde cli'

 

 

cde job create --application-file <path-to-file> --name <job_name> --conf spark.kerberos.access.hadoopFileSystems=s3a://nyc-tlc,s3a://blaw-sandbox2-cdp-bucket --type spark

 

 

Example Jobs

In this example, we will read in some of the nyc taxi dataset. Refer to Analyzing 1.1 Billion NYC Taxi and Uber Trips, with a Vengeance for a full write-up on the NYC Taxi dataset and cool things that can be done with it.
For the purposes of this exercise, we will read in the pre_2015 NYC Taxi dataset and the first half of 2015 taxi dataset from the AWS open data registry.
For this particular exercise, we need to access the AWS open data `s3a://nyc-tlc` bucket. By default, CDP Security settings only allow access to the CDP data bucket setup during the initialization of the CDP Environment. Refer to External AWS Bucket Access in CDP Public Cloud for a full write-up on how to access external buckets for CDP Public Cloud.
For the purposes of this exercise, I added a policy to my `DataLake Admin Role` to access `nyc-tlc`. To ensure that the Spark Applications in CDE can take advantage of this, I added in the Spark configuration 'spark.kerberos.access.hadoopFileSystems' with the value set to 's3a://nyc-tlc,s3a://blaw-sandbox2-cdp-bucket'. In my case, my default CDP data bucket is 'blaw-sandbox2-cdp-bucket' but yours will vary.

Building Jobs for CDE - Python

For Python developers, explore the folder: 'src/main/python' in there are two jobs, load_data.py and etl_job.py. Both are quite simple Spark applications with no external dependencies. We will get to dependency management later.
Exploring load_data.py, you can see that the `SparkSession` segment is quite concise. Also of note is that the Cloudera SDX layer, sets the Kerberos settings and makes sure that aws iam roles are handled automatically; there is no need to assign these in code through Spark configuration.
For deploying the `load_data` script, we can simply define it as a new job in the CDE UI.
For this particular example, we set up the load_data script used the following settings:
load_data_config.png
For the etl_job` script, we set it up like below:
etl_job.png
Once they are running, we can see it appear under jobs runs:
job_runs.png

Python Dependency Management

For many advanced jobs, pure PySpark may not suffice and it may be necessary to import in Python packages. For in-house libraries and extra scripts, the approach is straightforward. The CDE UI or the `--py-file`, `--jar` and `--file` flags in the CLI allow for attaching extra packages and files into the execution environment as per a standard `spark-submit`.
It is quite common to leverage external repositories like `PyPI`, however and these cannot simply be attached. Common solutions include using things like `conda-pack`, `venv` or `PEX` to package up Python libraries into a bundle to be unzipped by the executors as part of running the job.
With CDE, we have a built-in option akin to the `venv` solution. Note at the moment of writing, setting this up can only be done via the CLI. All we need for this is a standard `requirements.txt` file as is standard practice with Python projects and our CDE CLI all setup.
CDE has the concept of resources which is a collection of files, environments, etc that can be mounted into the instances running a Spark job.
To create a new resource, run:

 

 

cde resource create --name my_custom_env --type python-env

 

 

To upload a `requirements.txt, run:

 

 

cde resource upload --name my_custom_env --local-path requirements.txt

 

 

In this case, `--local-path` refers to the local instance where you are running the CDE CLI.
There are currently limitations with this process. Many advanced Data Science and Analytics libraries are not pure Python code and require extra OS-level libraries or compiling C++ libraries to ensure good performance.
For these use-cases, a fully customized image is required for CDE to start the drivers and executors with. Note that this is currently in Tech Preview and will require the assistance of your account team to activate. Building a full custom environment requires the use of Docker and proper Cloudera Paywall credentials to pull the base image from the Cloudera docker repo.
See `cde-docker-image` for an example image. Things to note are the base image:

 

 

FROM container.repository.cloudera.com/cloudera/dex/dex-spark-runtime-3.1.1:1.7.0-b129

 

 

Depending on what version of Spark and what version of CDE is being run, a different base image will be required. The base image, though it is an Ubuntu image, is set up to use `dnf` for package management, not `apt` as per normal.
Once you have your image built and uploaded to a docker repo that CDE can reach, it is possible to create a CDE job that leverages this. Note, customizing the base image can also only be done in the CLI at this stage.
First, create a resource as before

 

 

cde resource create --type="custom-runtime-image" --image-engine="<spark2 or 3>" --name="<runtime_name>" --image="<path_to_your_repo_and_image>"

 

 

Then, create your job on top of this image:

 

 

cde job create --type --name <your_job_name> --runtime-image-resource-name <runtime_name> --application-file <your_job_file>

 

 

For more information, including using password-protected docker repos, refer to Managing Python dependencies for Spark workloads in Cloudera Data Engineering.

Building Jobs for CDE - Scala

To work with Scala in CDE, you need to have an environment in order to build jar files. In this particular repo, we will leverage sbt, though maven is an option too. Refer to Installing sbt for more information on setting up a sbt environment for compiling jars. It may be useful to set this up in a docker container for easy reproducibility.
There are two key parts of a Scala project. The build.sbt file is in the root path where we will run `sbt`. This file includes information about the required libraries. Of note, in the sample, one is the `provided` flag, which indicates to sbt that these are in the build environment already. The CDE base image already includes Spark/ Hadoop and other base dependencies. Including them again could cause issues.
For novice Scala developers, it is also important to note that Scala versions are not cross-compatible. Spark 3.x only supports Scala 2.12.x, for example. Once we have correctly setup our Scala code under `src/main/scala` with an accompanying build.sbt in the base directory, we can run sbt clean package.
 
For first-time Scala developers, it is worth noting that there are two main approaches to building jar files for Spark. Jars can be built with all external dependencies included. This can make life easier but it also makes jar files larger.
They can also be built with no dependencies in case external dependencies. In our case, we included the Deequ library for data validation. So, we need to attach the Deequ jar for Scala 2.12 as one of the extra jars to our job. This can get pretty messy pretty fast though. For example, 'geomesa-spark-jts', which adds GIS functionality to Spark also depends in turn on other external libraries. We would have to identify these and also upload them.
uploading_dependencies.png
Jars without dependencies are commonly called slim jars and those with dependencies are called fat jars.
To build jars with dependencies, add a `project` folder with a file `plugins.sbt`. Inside this file, add:

 

 

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.9")

 

 

The build command then becomes 'sbt clean assembly'. For more information on using sbt for Spark jobs, refer to Introduction to SBT for Spark Programmers. For extra details on the assembly plugin refer to https://github.com/sbt/sbt-assembly
For small Spark jobs, CDE does support uploading a script directly through the CLI and the system will compile it on the fly. To execute your job in this manner, run: 

 

 

cde spark submit <scala_job>.scala --job-name <your_job_name>

 

 

Scheduling Jobs

By default, CDE supports cron for scheduling jobs. Cron has a specific syntax like `0 12 * * ?`. To understand what these expressions mean and how to write one, refer to A Guide To Cron Expressions. Alternatively, try this tool to easily test out cron expressions and what they mean: 
https://crontab.guru/#*/30_*_*_*_*
Whilst powerful, cron doesn't help with scheduling interdependencies between jobs. For this, we need to leverage the built-in Airflow scheduler. Airflow is a powerful scheduling tool originally developed by Airbnb that allows for coding up advanced interdependencies between jobs.
The core unit in airflow is a Directed Acyclical Graph (DAG). The DAG determines the executor order of the task and sets the parameters for each task to be run. Tasks in Airflow are called `Operators` since each one "operates" to execute a different task. For executing CDE jobs, we have a `CDEJobRunOperator` which is defined as follows:

 

 

my_airflow_task = CDEJobRunOperator(
    task_id='loader', # a name to identify the task
    dag=cde_process_dag, # the DAG that this operator will be attached to
    job_name='load_data_job' # the job that we are running.
)

 

 

Note: The `job_name` must match how the job was set up in CDE.

Jobs_screen.png
For example, with the above setup, we could sequence the Spark jobs `etl_job_expanded`, `load_data_expanded`, `etl_job` and `load_data_job`. In Airflow, individual DAGs and their associated operators are defined in Python files. For example, see `src/main/airflow/airflow_job.py`.
Note that there can only be one DAG per Airflow job. For most workflows, this will suffice. Airflow offers support for branching and merging dependencies too. The only time when we might need to coordinate multiple DAGs is if we need `Operators` running at different time intervals that have interdependencies. That is a pretty advanced topic and we will not cover this here.
With the individual operators and the DAG defined, we can set the execution order of the jobs. For example.

 

 

start >> load_data_job >> etl_job >> end

 

 

This means that the execution order of the operators is `start`, then `load_data_job`, then `etl_job`, and finally `end`. We can also define the dependencies across multiple lines. For example, to do branching, we could define something as follows:

 

 

start >> load_data_job
load_data_job >> etl_job_1
load_data_job >> etl_job_2

 

 

Troubleshooting and Debugging

So now, you can set up and schedule Python and Scala jobs in CDE. But what happens when things go wrong? First, we need to have some understanding of how CDE fits together. In CDP-Public Cloud, everything starts with the environment.
An environment consists of a CDP DataLake, which also contains the core security services. Attached to this is a CDE Service. There can be multiple CDE Services per DataLake. The way that a CDE Service is started determines what compute instances jobs can run on and the maximum number of compute instances that can be active.
For each CDE service, we can have multiple virtual clusters. The Spark version and max autoscale capacity of jobs are set on the virtual cluster level. In short:

CDP Environment (DataLake)

- CDE Service 1 (node-level scaling set here)
       - Virtual Cluster 1 (job-level scaling set here)
             - Job 1
             - Job 2..x
       - Virtual Cluster 2...x
- CDE Service ... x
Depending on the type of issues that we are troubleshooting, we will need to examine what is going on on different levels.

Understanding capacity issues

A common issue is running out of capacity. Remember we set this on two levels. A hard cap on the number of nodes allowed at the Service level and also a hard cap on the resources for a specific virtual cluster too.
cde_service_menu.png
Above we can see the Menu screen for the CDE service. Note that we can see that current 0/75 nodes are active with 0/1200 CPUs being used and 0/4800 GB memory being used. We can monitor the status of the cluster with the Grafana charts and also the Resource Scheduler buttons.
The Grafana Charts show the status of the cluster and a general number of jobs running.
cde_service_level_grafana.png
To understand a bit more about what our cluster is up tom we need to check the Resource Scheduler as well. Clicking on that tab will take us to:
cluster_yunikorn_menu.png
Under the Queue tab in the Yunikorn menu, we can see the current status of the queues in the system. Now, this is the first confusing bit. Why is the queue showing so little memory and why is CPU 51% utilized? This is because Yunikorn only knows about the nodes that are currently active and not how many we can potentially spin up. As jobs run and the cluster scales, we will see this number increasing
cde_yunikorn_queue.png
On the Virtual Cluster level, we have this cluster overview. Unfortunately, we cannot see why many of the 70 CPU and 422Gi of memory is consumed.
cde_virtual_cluster.png
But, we can go into the Grafana screen for the virtual cluster and see how many resources we are using and also potentially requesting.
cde_vc_grafana_cluster.png
Through the analysis of those screens, we can understand the state of resource consumption in the CDE Service. Issues with jobs starting will commonly be due to issues at this level. With either the YuniKorn scheduler being unable to schedule jobs, or the cluster autoscaler not being able to add nodes. It is worth pointing out at this juncture that for AWS at least there can be limits on the numbers of certain instances that an account can spin up. In heavy environments, it can be easy to hit those caps.
To understand cluster level stats and issues, check the Diagnostics tab in the menu at the CDE Service level. This provides the ability to download a Diagnostics Bundle and also a Summary status. Our support teams will also require these in order to be able to troubleshoot issues in depth.

Diagnosing Jobs and Spark Code

So, our job started fine, but it failed to complete successfully. In this case, we will have to look into the job run itself. Here is an example of a failed job:
failed_job.png
The first place to check is the logs:
job_logs.png
As we can see above, there are the `stderr` logs these are from the OS tier. The `stdout` logs are the errors from Spark itself.
spark_stdout.png
In this case, we can see that the job failed as the director that the job was trying to write to already existed. Once we correct that, the job will run fine as we can see from the successful second run above.

Conclusion

That was a quick whirlwind tour of Cloudera Data Engineering, Cloudera's cloud-native autoscaling Spark Service. Happy Data Engineering!
3,602 Views
0 Kudos
Comments
avatar
New Contributor

Hello,

 

I am trying to interact with hdfs from my spark app running on data engineering experience to list the directories and see the size of a directory.

do you have any idea how to do that?

 

Thank you

avatar
New Contributor

Thanks for the blog!