Community Articles

Find and share helpful community-sourced technical articles.
avatar
Rising Star

Objective

CDP Data Engineering (CDE) is the only cloud-native service purpose-built for enterprise data engineering teams. Building on Apache Spark, Data Engineering is an all-inclusive data engineering toolset that enables orchestration automation with Apache Airflow, advanced pipeline monitoring, visual troubleshooting, and comprehensive management tools to streamline ETL processes across enterprise analytics teams.

Apache Sedona™ (formerly known as "GeoSpark") is a cluster computing system for processing large-scale spatial data. Sedona extends existing cluster computing systems, such as Apache Spark and Apache Flink, with a set of out-of-the-box distributed Spatial Datasets and Spatial SQL that efficiently load, process, and analyze large-scale spatial data across machines.

Sedona jobs can run in CDE with minor configuration changes. This article will present a simple workflow to help you get started with Spark Geospatial use cases in CDE.

All code, dependencies and CLI commands are available for reuse in this git repository

Requirements

The following are required to reproduce the Demo in your CDE Virtual Cluster:

  • CDE Service version 1.19 and above
  • A Working installation of the CDE CLI. Instructions to install the CLI are provided here.
  • A working installation of git in your local machine. Please clone this git repository and keep in mind all commands assume they are run in the project's main directory.

Custom Docker Runtime as CDE Resource

By default, CDE workloads rely on CDE Resources to mount dependencies into a Job container. CDE Resources can be of three types" Files, Python Environment, and Custom Docker Runtime. 

In our case we will create a Custom Docker Runtime to ensure Sedona and all required packages are correctly installed. Please open the Dockerfile located in the CDE_Runtime folder to explore more details.

The below steps have already been executed and the image is available for reuse in my public Docker registry. If you want, create your own image by replacing my Docker username and running the following commands from your terminal:

 

docker build --network=host -t pauldefusco/dex-spark-runtime-3.2.3-7.2.15.8:1.20.0-b15-sedona-geospatial-002 . -f Dockerfile

docker run -it docker build --network=host -t pauldefusco/dex-spark-runtime-3.2.3-7.2.15.8:1.20.0-b15-sedona-geospatial-003 . -f Dockerfile /bin/bash

docker push pauldefusco/dex-spark-runtime-3.2.3-7.2.15.8:1.20.0-b15-sedona-geospatial-003
 
Next, create CDE Resource Credentials and the CDE Custom Docker Runtime resource:
 
cde credential create --name docker-creds-pauldefusco --type docker-basic --docker-server hub.docker.com --docker-username pauldefusco

cde resource create --name dex-spark-runtime-sedona-geospatial-pauldefusco --image pauldefusco/dex-spark-runtime-3.2.3-7.2.15.8:1.20.0-b15-sedona-geospatial-003 --image-engine spark3 --type custom-runtime-image

CDE FIles Resources for Datasets

Sedona is compatible with many geospatial file formats. In our example we will load geospatial countries data to a CDE Files Resource so it can be read by the CDE Spark Job. The "cde resource upload-archive" command allows you to bulk upload and extract a zipped folder:

 

cde resource create --name countries_data

cde resource upload-archive --name countries_data --local-path data/ne_50m_admin_0_countries_lakes.zip

 CDE Files Resources for Job Code

Next, create another CDE Files Resource to store your code and dependencies in your Virtual Cluster for reuse.

Before running the below command, open properties.conf and replace your Cloud Default Storage location and CDP Workload Username. If you need help finding these please contact your CDP Administrator.

cde resource create --name job_code

cde resource upload --name job_code --local-path code/geospatial_joins.py --local-path code/geospatial_rdd.py --local-path code/parameters.conf --local-path code/utils.py

Geospatial RDD Spark Job in CDE

The Sedona RDD allows you to store geospatial data with custom data types in a Spark RDD. You can then use these for performing geospatial queries. In this section we will create a CDE Spark Job to manipulate Sedona RDD's.

First create the CDE Spark Job with the following command.

Notice that the CDE CLI allows you to specify multiple CDE Files Resources along with a prefix. The prefix can be used within the Python script to reference data or files in one or more CDE Files Resources. In this CLI command we reference the job_code and countries_data Files Resources to respectively locate the script file and access the data in geospatial format:

Also notice that the CDE Custom Docker Runtime is referenced with the "runtime-image-resource-name" parameter.

Finally, at time of this writing Sedona 1.5 is the latest version and, similarly to a spark-submit, we use the packages parameter to load the Maven packages into our CDE Spark Job.

 

cde job create --name geospatial_rdd --type spark --mount-1-prefix jobCode/ --mount-1-resource job_code --mount-2-prefix countriesData/ --mount-2-resource countries_data --runtime-image-resource-name dex-spark-runtime-sedona-geospatial-pauldefusco --packages org.apache.sedona:sedona-spark-shaded-3.0_2.12:1.5.0,org.datasyslab:geotools-wrapper:1.5.0-28.2 --application-file jobCode/geospatial_rdd.py

Next, open "geospatial_rdd.py" in your editor. Familiarize yourself with the code and notice the following:

  • The SparkSession is created and then passed to the SedonaContext object. From then on, all Spark SQL queries are running in the SedonaContext.
spark = SparkSession \
.builder \
.appName("IOT DEVICES LOAD") \
.config("spark.kubernetes.access.hadoopFileSystems", data_lake_name)\
.getOrCreate()

config = SedonaContext.builder().getOrCreate()

sedona = SedonaContext.create(spark)

sc = sedona.sparkContext
sc.setSystemProperty("sedona.global.charset", "utf8")
  • Data is read from the CDE Files Resource using the CDE CLI Alias. A SpatialRDD is created with it:
countries_rdd = ShapefileReader.readToGeometryRDD(sc, "/app/mount/countriesData")
  • Sedona allows you to store data in Cloud Storage in GeoJSON format:
countries_rdd.saveAsGeoJSON(data_lake_name + geoparquetoutputlocation + "/countries.json")
  •  A Spark Dataframe can be created from a SpatialRDD using the Sedona Adapter:
countries_df = Adapter.toDf(countries_rdd, sedona)
  • You can generate Geospatial POINT data with your favorite util as shown in util.py. The data can then be transformed into Sedona POINT data using Sedona SQL:
dg = DataGen(spark, username)

iot_points_df = dg.iot_points_gen(row_count = 100000, unique_vals=100000)
iot_points_df.createOrReplaceTempView("iot_geo_tmp")

iot_points_geo_df = sedona.sql("SELECT id, device_id, manufacturer, event_type, event_ts, \
ST_Point(CAST(iot_geo_tmp.latitude as Decimal(24,20)), \
CAST(iot_geo_tmp.longitude as Decimal(24,20))) as arealandmark \
FROM iot_geo_tmp")
  • Sedona allows you to spatially partition and index your data:
iotSpatialRDD.spatialPartitioning(GridType.KDBTREE)
  • We can use the SpatialRDD to run geospatial queries on our data; for example, we calculate the distance between each POINT object and a predefined point in our query e.g. 52-21:
iotSpatialRDD.rawSpatialRDD.map(lambda x: x.geom.distance(Point(21, 52))).take(5)
  • Finally, we run a KNN query on the same data to obtain the k number of POINT objects that lay closes to the provided POINT object:
result = KNNQuery.SpatialKnnQuery(iotSpatialRDD, Point(-84.01, 34.01), 5, False)

Run the Job with the CLI:

cde job run --name geospatial_rdd --executor-cores 2 --executor-memory "4g"

Geospatial Join Spark Job in CDE

Similarly to above, create a "geospatial_joins" CDE Spark Job using the CLI:

cde job create --name geospatial_joins --application-file jobCode/geospatial_joins.py --type spark --mount-1-prefix jobCode/ --mount-1-resource job_code --mount-2-prefix countriesData/ --mount-2-resource countries_data --runtime-image-resource-name dex-spark-runtime-sedona-geospatial-pauldefusco --packages org.apache.sedona:sedona-spark-shaded-3.0_2.12:1.5.0,org.datasyslab:geotools-wrapper:1.5.0-28.2 

Then open "geospatial_joins.py"; notice that the synthetic IOT data is joined with the countries data loaded from a CDE Files resource.

  • The data is loaded from Cloud Storage in GeoJSON format:
from sedona.core.formatMapper import GeoJsonReader

geo_json_file_location = data_lake_name + geoparquetoutputlocation + "/iot_spatial.json"
saved_rdd_iot = GeoJsonReader.readToGeometryRDD(sc, geo_json_file_location)
  • Using Sedona SQL the data is joined using the "ST_Contains" spatial predicate. Sedona allows us to join the two tables by matching POINT objects from the IOT dataset located within the boundaries of the POLYGON objects used to define country shapes in the Countries dataset:
GEOSPATIAL_JOIN = """
SELECT c.geometry as country_geom,
c.NAME_EN,
a.geometry as iot_device_location,
a.device_id
FROM COUNTRIES_{0} c, IOT_GEO_DEVICES_{0} a
WHERE ST_Contains(c.geometry, a.geometry)
""".format(username)

result = sedona.sql(GEOSPATIAL_JOIN)

Run the Job with the CLI:

cde job run --name geospatial_joins --executor-cores 2 --executor-memory "4g"

Summary

This article showcased a simple geospatial use case with Apache Sedona. You can easily run Sedona Spark jobs in Cloudera Data Engineering to run Geospatial Spark Jobs at Scale. 

1,750 Views
0 Kudos