Created on 10-23-2023 06:29 PM
Objective
CDP Data Engineering (CDE) is the only cloud-native service purpose-built for enterprise data engineering teams. Building on Apache Spark, Data Engineering is an all-inclusive data engineering toolset that enables orchestration automation with Apache Airflow, advanced pipeline monitoring, visual troubleshooting, and comprehensive management tools to streamline ETL processes across enterprise analytics teams.
Apache Sedona™ (formerly known as "GeoSpark") is a cluster computing system for processing large-scale spatial data. Sedona extends existing cluster computing systems, such as Apache Spark and Apache Flink, with a set of out-of-the-box distributed Spatial Datasets and Spatial SQL that efficiently load, process, and analyze large-scale spatial data across machines.
Sedona jobs can run in CDE with minor configuration changes. This article will present a simple workflow to help you get started with Spark Geospatial use cases in CDE.
All code, dependencies and CLI commands are available for reuse in this git repository.
Requirements
The following are required to reproduce the Demo in your CDE Virtual Cluster:
Custom Docker Runtime as CDE Resource
By default, CDE workloads rely on CDE Resources to mount dependencies into a Job container. CDE Resources can be of three types" Files, Python Environment, and Custom Docker Runtime.
In our case we will create a Custom Docker Runtime to ensure Sedona and all required packages are correctly installed. Please open the Dockerfile located in the CDE_Runtime folder to explore more details.
The below steps have already been executed and the image is available for reuse in my public Docker registry. If you want, create your own image by replacing my Docker username and running the following commands from your terminal:
docker build --network=host -t pauldefusco/dex-spark-runtime-3.2.3-7.2.15.8:1.20.0-b15-sedona-geospatial-002 . -f Dockerfile docker run -it docker build --network=host -t pauldefusco/dex-spark-runtime-3.2.3-7.2.15.8:1.20.0-b15-sedona-geospatial-003 . -f Dockerfile /bin/bash docker push pauldefusco/dex-spark-runtime-3.2.3-7.2.15.8:1.20.0-b15-sedona-geospatial-003
cde credential create --name docker-creds-pauldefusco --type docker-basic --docker-server hub.docker.com --docker-username pauldefusco
cde resource create --name dex-spark-runtime-sedona-geospatial-pauldefusco --image pauldefusco/dex-spark-runtime-3.2.3-7.2.15.8:1.20.0-b15-sedona-geospatial-003 --image-engine spark3 --type custom-runtime-image
CDE FIles Resources for Datasets
Sedona is compatible with many geospatial file formats. In our example we will load geospatial countries data to a CDE Files Resource so it can be read by the CDE Spark Job. The "cde resource upload-archive" command allows you to bulk upload and extract a zipped folder:
cde resource create --name countries_data
cde resource upload-archive --name countries_data --local-path data/ne_50m_admin_0_countries_lakes.zip
CDE Files Resources for Job Code
Next, create another CDE Files Resource to store your code and dependencies in your Virtual Cluster for reuse.
Before running the below command, open properties.conf and replace your Cloud Default Storage location and CDP Workload Username. If you need help finding these please contact your CDP Administrator.
cde resource create --name job_code
cde resource upload --name job_code --local-path code/geospatial_joins.py --local-path code/geospatial_rdd.py --local-path code/parameters.conf --local-path code/utils.py
Geospatial RDD Spark Job in CDE
The Sedona RDD allows you to store geospatial data with custom data types in a Spark RDD. You can then use these for performing geospatial queries. In this section we will create a CDE Spark Job to manipulate Sedona RDD's.
First create the CDE Spark Job with the following command.
Notice that the CDE CLI allows you to specify multiple CDE Files Resources along with a prefix. The prefix can be used within the Python script to reference data or files in one or more CDE Files Resources. In this CLI command we reference the job_code and countries_data Files Resources to respectively locate the script file and access the data in geospatial format:
Also notice that the CDE Custom Docker Runtime is referenced with the "runtime-image-resource-name" parameter.
Finally, at time of this writing Sedona 1.5 is the latest version and, similarly to a spark-submit, we use the packages parameter to load the Maven packages into our CDE Spark Job.
cde job create --name geospatial_rdd --type spark --mount-1-prefix jobCode/ --mount-1-resource job_code --mount-2-prefix countriesData/ --mount-2-resource countries_data --runtime-image-resource-name dex-spark-runtime-sedona-geospatial-pauldefusco --packages org.apache.sedona:sedona-spark-shaded-3.0_2.12:1.5.0,org.datasyslab:geotools-wrapper:1.5.0-28.2 --application-file jobCode/geospatial_rdd.py
Next, open "geospatial_rdd.py" in your editor. Familiarize yourself with the code and notice the following:
spark = SparkSession \
.builder \
.appName("IOT DEVICES LOAD") \
.config("spark.kubernetes.access.hadoopFileSystems", data_lake_name)\
.getOrCreate()
config = SedonaContext.builder().getOrCreate()
sedona = SedonaContext.create(spark)
sc = sedona.sparkContext
sc.setSystemProperty("sedona.global.charset", "utf8")
countries_rdd = ShapefileReader.readToGeometryRDD(sc, "/app/mount/countriesData")
countries_rdd.saveAsGeoJSON(data_lake_name + geoparquetoutputlocation + "/countries.json")
countries_df = Adapter.toDf(countries_rdd, sedona)
dg = DataGen(spark, username)
iot_points_df = dg.iot_points_gen(row_count = 100000, unique_vals=100000)
iot_points_df.createOrReplaceTempView("iot_geo_tmp")
iot_points_geo_df = sedona.sql("SELECT id, device_id, manufacturer, event_type, event_ts, \
ST_Point(CAST(iot_geo_tmp.latitude as Decimal(24,20)), \
CAST(iot_geo_tmp.longitude as Decimal(24,20))) as arealandmark \
FROM iot_geo_tmp")
iotSpatialRDD.spatialPartitioning(GridType.KDBTREE)
iotSpatialRDD.rawSpatialRDD.map(lambda x: x.geom.distance(Point(21, 52))).take(5)
result = KNNQuery.SpatialKnnQuery(iotSpatialRDD, Point(-84.01, 34.01), 5, False)
Run the Job with the CLI:
cde job run --name geospatial_rdd --executor-cores 2 --executor-memory "4g"
Geospatial Join Spark Job in CDE
Similarly to above, create a "geospatial_joins" CDE Spark Job using the CLI:
cde job create --name geospatial_joins --application-file jobCode/geospatial_joins.py --type spark --mount-1-prefix jobCode/ --mount-1-resource job_code --mount-2-prefix countriesData/ --mount-2-resource countries_data --runtime-image-resource-name dex-spark-runtime-sedona-geospatial-pauldefusco --packages org.apache.sedona:sedona-spark-shaded-3.0_2.12:1.5.0,org.datasyslab:geotools-wrapper:1.5.0-28.2
Then open "geospatial_joins.py"; notice that the synthetic IOT data is joined with the countries data loaded from a CDE Files resource.
from sedona.core.formatMapper import GeoJsonReader
geo_json_file_location = data_lake_name + geoparquetoutputlocation + "/iot_spatial.json"
saved_rdd_iot = GeoJsonReader.readToGeometryRDD(sc, geo_json_file_location)
GEOSPATIAL_JOIN = """
SELECT c.geometry as country_geom,
c.NAME_EN,
a.geometry as iot_device_location,
a.device_id
FROM COUNTRIES_{0} c, IOT_GEO_DEVICES_{0} a
WHERE ST_Contains(c.geometry, a.geometry)
""".format(username)
result = sedona.sql(GEOSPATIAL_JOIN)
Run the Job with the CLI:
cde job run --name geospatial_joins --executor-cores 2 --executor-memory "4g"
Summary
This article showcased a simple geospatial use case with Apache Sedona. You can easily run Sedona Spark jobs in Cloudera Data Engineering to run Geospatial Spark Jobs at Scale.