Community Articles

Find and share helpful community-sourced technical articles.
Announcements
Celebrating as our community reaches 100,000 members! Thank you!
avatar
Contributor

Objective

CDE is the Cloudera Data Engineering Service, a containerized managed service for Cloudera Data Platform designed for Large Scale Batch and Streaming Pipelines with Spark, Airflow and Iceberg. It allows you to submit batch jobs to auto-scaling virtual clusters. As a Cloud-Native service, CDE enables you to spend more time on your applications, and less time on infrastructure.

CDE allows you to create, manage, and schedule Apache Spark jobs without the overhead of creating and maintaining Spark clusters. With CDE, you define virtual clusters with a range of CPU and memory resources, and the cluster scales up and down as needed to run your Spark workloads, helping to control your cloud costs.

CDE Sessions are short-lived development environment used by Data Engineers and Analysts in order to iterate upon and build Spark workloads, prototype pipelines, or simply explore data. You can use CDE Sessions in CDE Virtual Clusters of type "All Purpose - Tier 2".

In this example we will use a CDE Session to perform basic Cloud Storage File System operations with the Hadoop API. The API was originally used to perform file system operations in HDFS but is still used to interact with S3 and ADLS in the Public Cloud.

Requirements

The only requirement for this exercise is a CDE Virtual Cluster in a CDE Service on version 1.19+. Version 1.20+ is recommended.

Copying the following commands in a CDE Session is recommended, although you can also run the same code in a CDE Job of type Spark.

Step by Step Instructions

Preload some data in a cloud storage location of your choice. Then, specify the path as a Python variable. In this example we use two locations to read and write data with AWS S3.

>>> readlocation = "s3a://go01-demo/mycsvfile.csv"
>>> writelocation = "s3a://go01-demo/newdir/myfile"

 Read the file in a Spark Dataframe:

>>> df=spark.read.csv(readlocation)

 Obtain the configured FileSystem implementation:

>>> myPath = "s3a://go01-demo/newdir"

>>> sc=df.rdd.context
>>> hadoop = sc._jvm.org.apache.hadoop
>>> hdfs = hadoop.fs.FileSystem.get(sc._jvm.java.net.URI.create(myPath), sc._jsc.hadoopConfiguration())

Write the dataframe as a file in Cloud Storage:

>>> df.write.mode("overwrite").parquet(writelocation)

List the contents of the directory:

>>> status = hdfs.listStatus(hadoop.fs.Path("s3a://go01-demo/newdir"))
>>> for fileStatus in status:
      print(fileStatus.getPath())

s3a://go01-demo/newdir/myfile  

List the contents of a directory with filter pattern:

>>> status = hdfs.globStatus(hadoop.fs.Path("s3a://go01-demo/*.csv"))
>>> for fileStatus in status:
        print(fileStatus.getPath())

s3a://go01-demo/test_file.csv

Rename file:

>>> hdfs.rename(hadoop.fs.Path("s3a://go01-demo/test_file.csv"), hadoop.fs.Path("s3a://go01-demo/test_file_2.csv"))
True

>>> status = hdfs.globStatus(hadoop.fs.Path("s3a://go01-demo/*.csv"))
>>> for fileStatus in status:
        print(fileStatus.getPath())

s3a://go01-demo/test_file_2.csv

Delete file:

>>> hdfs.delete(hadoop.fs.Path("s3a://go01-demo/test_file_2.csv"), True)
True

>>> status = hdfs.globStatus(hadoop.fs.Path("s3a://go01-demo/*.csv"))
>>> for fileStatus in status:
        print(fileStatus.getPath())

Create a subdirectory:

>>> hdfs.mkdirs(hadoop.fs.Path("s3a://go01-demo/newdir/newsubdir"))
True

References

The full API Method list is located at this link.

Summary and Next Steps

Cloudera Data Engineering (CDE) provides a command line interface (CLI) client. You can use the CLI to create and update jobs, view job details, manage job resources, run jobs, and so on.

In this article we have reviewed some advanced use cases for the CLI. If you are using the CDE CLI you might also find the following articles and demos interesting:

363 Views