Member since
11-22-2019
31
Posts
10
Kudos Received
0
Solutions
11-16-2023
11:27 AM
The Cloudera Data Platform (CDP) is a hybrid data platform designed to deliver faster and easier data management, analytics and AI at enterprise scale. Cloudera Machine Learning (CML) is one of CDP’s Cloud Native Data Services designed to enable secure, governed Data Science. Data Scientists and Engineers use CML to securely analyze large amounts of data via interactive notebooks. Since 2022 CML enhances this capability with the Data Connections feature by providing boiler template code to access and push down SQL queries to the CDW service. This allows the user to run large scale queries directly in the Data Warehouse while accessing and visualizing results from a simple notebook with a small consumption footprint. In 2023 CML doubled down with CML Custom Data Connections. Custom Data Connections allow the CML user to create their own boiler template code so they or their collaborators can easily connect to 3rd party tools such as Snowflake, Postgres, legacy on-prem databases (Oracle, MSSQL, MySQL), serverless cloud databases (Redshift, Snowflake, SAP HANA Cloud, BigQuery), APIs, and specialized data stores (Neo4j). This article provides a tutorial on how to create a MySQL Custom Data Connection in your own CML Workspace. The code referenced is provided in this git repository. Data Connections Refresher You can use Data Connections from a CML Session to connect to a CDP environment (CDW or Spark) regardless of the Editor choice by following the below steps: Open your CML Session and click on the “Data” tab at the top right of your screen. Scroll down and select the CDW or Spark Data Lake connection of your choice. Copy and paste the code in your Editor. Run the sample SHOW DATABASES command and create your SQL statements. Learning from the Postgres Custom Data Connection The CML Engineering Team has created two custom data connection templates that you can reuse to connect to Snowflake and Postgres. In this section we will use the Postgres template to learn how to deploy an existing custom connection. Steps to Deploy the Postgres Custom Connection to your Project Create a CML Project by cloning the provided template with this URL: https://github.com/pdefusco/Using_CustomConn_CML.git Notice the mysqlconn and postgresconn folders are included in the project. Open the postgres folder and familiarize yourself with the code in pg-conn.py. This is the source code for the Custom Connection. The PostgresCustomImp class is a child of CustomConnection which is imported from cml.data_v1.customconnection. You must extend this class in order to create a custom connection. The get_base_connection method uses the psycopg2 module to establish a connection to a given Postgres source. The connection requires a hostname, a default port, a database name, a user and password. When implementing your own custom connection you can choose which connection parameters to include as needed. The get_pandas_dataframe method executes the provided SQL via the Pandas read_sql method. The get_cursor method returns a cursor object. This is optional as the SQL command is executed in get_pandas_dataframe. The parameters required to initialize a connection are set manually via the UI (see below). The override_parameters and check_params_or_env methods ensure that these are set correctly. While these methods are also optional, we recommend implementing them. Navigate to the Workspace Site Administration page and open the Data Connections Tab. Click on the “New Connection” icon. Fill out the form as shown below: Name: Custom Postgres Type: Custom Connection Type Display: Postgres Project: select your project Connection Files: select the “postgresconn” folder Custom Parameters: create the following four key value pairs: PG_HOST : hh-pgsql-public.ebi.ac.uk PG_PORT : 5432 PG_DB : pfmegrnargs PG_USER : reader Note: in this example we are connecting to the RNAcentral Public Postgres Database. Please visit this URL for more information: https://rnacentral.org/help/public-database Navigate back to your project. Open the “Project Settings” page and then the “Data Connections” tab. Make the new connection available in the project by clicking on “Sync with Workspace”. Launch a CML Session with JupyterLab as your Editor. A small resource profile without GPUs is ok. There is no need to enable a Spark Runtime Add-On. The Data Connections window will load automatically. Notice the new “Custom Postgres” connection with a reusable template code block. Open the “using_connections” notebook. Notice the code block to connect to the Postgres database has already been prepopulated for you. Normally you would copy and paste from the Data Connections pop up window. Execute the first four cells and validate that query results are output in the notebook (do not run the entire notebook!). Creating a MySQL Custom Data Connection Now that we have connected to Postgres with a provided Custom Connection template we can follow the same steps to create a custom MySQL connection. Open the mysqlconn folder and familiarize yourself with the code. Notice the source code for this connection is in a separate folder. This is required in order to select a folder in the Custom Data Connection creation form in the Workspace Settings. As in the previous example, we create a MySQLCustomImp class which inherits from the CustomConnection interface in the cml.data_v1.customconnection module. This module does not have to be pip installed and is already provided to you in CML by default. The implemented class methods are similar to the Postgres example. Notice that in this case we don’t have a method to return the connection cursor. In addition, we leverage the mysql-connector-python package rather than psycopg2. To use this custom connection, go through the same steps and set the following Custom Parameters: MYSQL_HOST : mysql-rfam-public.ebi.ac.uk MYSQL_PORT : 4497 MYSQL_DB : Rfam MYSQL_USER: rfamro Note: in this example we are connecting to the Rfam MySQL public database. For more information please visit this URL: https://docs.rfam.org/en/latest/database.html Conclusions In this article we highlighted CML Custom Data Connections. In summary: CML is a Cloud Native Platform for Enterprise Machine Learning. The built-in integrations with the Cloudera Data Platform (CDP) allow CML Users to operate in a secure, governed Machine Learning environment at scale. CML Data Connections enhance Data Analysis, Exploration, and Model Experimentation by providing Data Scientists with an easy interface to process large amounts of data from a notebook with minimum compute resources. Custom Data Connections augment this capability by opening access to 3rd party systems such as external RDBMSs or Cloud Vendor Databases such as Snowflake. CML Users are free to implement their own Custom Connections while giving the CML Admins the ability to approve them at the Workspace level. What Custom Data Connection are you using in CML? Please don’t hesitate to comment with your favorite external data sources.
... View more
Labels:
10-25-2023
08:27 PM
Objective
Great Expectations is a Python-based open-source library for validating, documenting, and profiling your data. It helps you maintain data quality and improve communication about data between teams. Software developers have long known that automated testing is essential for managing complex codebases. Great Expectations brings the same discipline, confidence, and acceleration to data science and data engineering teams. (Source: Great Expectations)
CDP Data Engineering is the only cloud-native service purpose-built for enterprise data engineering teams. Building on Apache Spark, Data Engineering is an all-inclusive data engineering toolset that enables orchestration automation with Apache Airflow, advanced pipeline monitoring, visual troubleshooting, and comprehensive management tools to streamline ETL processes across enterprise analytics teams.
Data Engineering is fully integrated with the Cloudera Data Platform, enabling end-to-end visibility and security with SDX as well as seamless integrations with CDP services such as data warehouses and machine learning. Data engineering on CDP powers consistent, repeatable, and automated data engineering workflows on a hybrid cloud platform anywhere. (Source: Streamline and operationalize data pipelines securely at any scale.)
Spark Data Engineers use Great Expectations in CDE to enforce data quality standards on enterprise datasets and data engineering pipelines at massive scale. In the rest of this tutorial, we will walk through a basic workflow with Great Expectations and Spark.
All code, dependencies, and CLI commands are available for reuse in this git repository.
Requirements
The following are required to reproduce the demo in your CDE virtual cluster:
CDE Service version 1.19 and above
A working installation of the CDE CLI Instructions to install the CLI are provided here.
A working installation of git on your local machine Please clone the git repository and keep in mind that all commands assume they are run in the project's main directory.
Custom Docker Runtime as CDE Resource
By default, CDE workloads rely on CDE Resources to mount dependencies into a job container. CDE resources can be of three types:"files,, Python environment, and custom Docker untime.
In our case, we will create a custom Docker runtime to ensure Great Expectations and all required packages are correctly installed. Please open the Dockerfile located in the CDE_Runtime folder to explore more details.
The below steps have already been executed, and the image is available for reuse in my public Docker registry. If you want, create your own image by personalizing the value in the --name parameter to reflect your username and replacing the value in the --docker-username parameter with your DockerHub username.
docker build --network=host -t pauldefusco/dex-spark-runtime-3.2.3-7.2.15.8:1.20.0-b15-great-expectations-data-quality Dockerfile docker run it pauldefusco/dex-spark-runtime-3.2.3-7.2.15.8:1.20.0-b15-great-expectations-data-quality Dockerfile /bin/bash docker push pauldefusco/dex-spark-runtime-3.2.3-7.2.15.8:1.20.0-b15-great-expectations-data-quality
Next, create CDE Resource Credentials and the CDE Custom Docker Runtime resource:
cde credential create --name docker-creds-pauldefusco --type docker-basic --docker-server hub.docker.com --docker-username pauldefusco cde resource create --name dex-spark-runtime-great-expectations-data-quality --image pauldefusco/dex-spark-runtime-3.2.3-7.2.15.8:1.20.0-b15-great-expectations-data-quality --image-engine spark3 --type custom-runtime-image
CDE FIles Resources for Job Code
Next, create another CDE Files Resource to store your code and dependencies in your virtual cluster for reuse.
Before running the below command, open properties.conf and replace your Cloud Default Storage location and CDP Workload Username. If you need help finding these, please contact your CDP administrator. Then upload scripts and dependencies.
cde resource create --name job_code_data_quality cde resource upload --name job_code_data_quality --local-path code/airflow.py --local-path code/batch_load.py --local-path code/great_expectations.py --local-path code/parameters.conf --local-path code/utils.py
Finally, we will create two CDE Spark jobs for the data quality pipeline and one CDE Airflow job to orchestrate it. Before doing so, open code/great_expectations.py and familiarize yourself with the code. Here are some of the most important points about the job:
We create a synthetic banking dataset with the Faker and dbldatagen libraries. This gives us 100,000 banking transactions with transaction amount, currency, account numbers, routing numbers, and much more banking data. The data columns are of different types, including numerical, timestamp, and categorical. Review the utils.py script to learn more details.
FakerTextUS = FakerTextFactory(locale=['en_US'], providers=[bank]) partition parameters, etc. self.spark.conf.set("spark.sql.shuffle.partitions", shuffle_partitions_requested) fakerDataspec = (DataGenerator(self.spark, rows=data_rows, partitions=partitions_requested)) .withColumn("name", percentNulls=0.1, text=FakerTextUS("name") ) .withColumn("address", text=FakerTextUS("address" )) .withColumn("email", text=FakerTextUS("ascii_company_email") ) .withColumn("aba_routing", text=FakerTextUS("aba" )) .withColumn("bank_country", text=FakerTextUS("bank_country") ) .withColumn("account_no", text=FakerTextUS("bban" )) .withColumn("int_account_no", text=FakerTextUS("iban") ) .withColumn("swift11", text=FakerTextUS("swift11" )) .withColumn("credit_card_number", text=FakerTextUS("credit_card_number") ) .withColumn("credit_card_provider", text=FakerTextUS("credit_card_provider") ) .withColumn("event_type", "string", values=["purchase", "cash_advance"],random=True) .withColumn("event_ts", "timestamp", begin="2020-01-01 01:00:00, end="2020-12-31 23:59:00",interval="1 minute", random=True ) .withColumn("longitude", "float", minValue=-180, maxValue=180, random=True) .withColumn("latitude", "float", minValue=-90, maxValue=90, random=True) .withColumn("transaction_currency", values=["USD", "EUR", "KWD", "BHD", "GBP", "CHF", "MEX"]) .withColumn("transaction_amount", "decimal", minValue=0.01, maxValue=30000, random=True) ) df = fakerDataspec.build()
In ge_data_quality.py, the "SparkDFDataset" class is imported from the "great_expectations.dataset.sparkdf_dataset" module. This allows you to wrap a Spark Dataframe and make it compatible with the Great Expectations API
gdf = SparkDFDataset(df)
From the GE Documentation, "Expectations are declarative statements and the interface to the Great Expectations library, supporting your validation, profiling, and translation. Expectations are implemented as classes; some are in the core library, with many others contributed by community members." In the script, we implemented a few wrapper methods for a variety of built-in expectations. For example, the "expect_column_to_exist" expectation checks for column existence.
assert gdf.expect_column_to_exist(column).success, Column {column} is not found."
The GE Gallery offers a constantly growing number of expectations to test for different conditions on different column value types. For example, in the script, we use expectations to check for nulls for all columns: longitude and latitude min and max for transactions; mean and standard deviation for transaction amounts; email string validity with REGEX; and finally, whether the values in the categorical Currency column match or are contained in a provided test set. Here are some more examples that can be found in the ge_data_quality.py script:
# EXPECTATION ON ALL COLUMNS Ensure the existence of all mandatory columns. def run_existance_expactation(gdf, MANDATORY_COLUMNS): for the column in MANDATORY_COLUMNS: try: assert gdf.expect_column_to_exist(column).success, Column {column} is not found." print(f"Column {column} is found") except Exception as e: print(e)
# EXPECTATION ON NUMERIC COLUMN # Ensure minimum longitude is between -180 and 180 def run_longitude_min_expectation(gdf): try: test_result = gdf.expect_column_min_to_be_between(column="longitude", min_value=-180, max_value=180).success, f"Min for column longitude is not within expected range\n" assert test_result.success, f"Min for column longitude is within expected range\n" except Exception as e: print(e)
# EXPECTATION ON STRING COLUMNS Use REGEX to ensure email is in correct format def run_email_match_regex_expectation(gdf): try: test_result = gdf.expect_column_values_to_match_regex(column="email", regex="^[\w-\.]+@([\w-]+\.)+[\w-]{2,4}$").success, f"Values for column Email are not within REGEX pattern." assert test_result.success, f"Values for column Email are within REGEX Pattern\n" except Exception as e: print(e)
Please review the ge_data_quality.py script to find more examples.
Again, the idea is to organize tests in a clear, reliable, and reusable manner. When combined with the modularity of the CDE Job and Dependency construct, using Great Expectations and Spark allows you to structure your data pipelines under clear data quality rules.
Finally, the Airflow Job is scheduled to run every five minutes by default. Therefore, there is no need to manually run any of the above jobs. Run the below commands to create the jobs. Then open the CDE Job Runs page to validate the pipeline.
cde job create --name batch_load --type spark --mount-1-prefix jobCode/ --mount-1-resource job_code_data_quality --runtime-image-resource-name dex-spark-runtime-great-expectations-data-quality --application-file jobCode/batch_load.py cde job create --name data_quality --type spark --mount-1-prefix jobCode/ --mount-1-resource job_code_data_quality --runtime-image-resource-name dex-spark-runtime-great-expectations-data-quality --application-file jobCode/great_expectations.py cde job create --name data_quality_orchestration --type airflow --mount-1-prefix jobCode/ --mount-1-resource job_code_data_quality --dag-file airflow.py
Summary
With CDE Runtimes, you can choose any data quality package of your choice. This article in particular showcased a simple data quality pipeline with Great Expectations, a leading open source package for testing and validating data at scale. You can easily leverage GE at scale with Spark in Cloudera Data Engineering in order to complement Great Expectations' reusable declarative expectations with advanced Spark and Airflow Job observability, monitoring, and development capabilities provided by CDE.
... View more
10-23-2023
06:29 PM
Objective CDP Data Engineering (CDE) is the only cloud-native service purpose-built for enterprise data engineering teams. Building on Apache Spark, Data Engineering is an all-inclusive data engineering toolset that enables orchestration automation with Apache Airflow, advanced pipeline monitoring, visual troubleshooting, and comprehensive management tools to streamline ETL processes across enterprise analytics teams. Apache Sedona™ (formerly known as "GeoSpark") is a cluster computing system for processing large-scale spatial data. Sedona extends existing cluster computing systems, such as Apache Spark and Apache Flink, with a set of out-of-the-box distributed Spatial Datasets and Spatial SQL that efficiently load, process, and analyze large-scale spatial data across machines. Sedona jobs can run in CDE with minor configuration changes. This article will present a simple workflow to help you get started with Spark Geospatial use cases in CDE. All code, dependencies and CLI commands are available for reuse in this git repository. Requirements The following are required to reproduce the Demo in your CDE Virtual Cluster: CDE Service version 1.19 and above A Working installation of the CDE CLI. Instructions to install the CLI are provided here. A working installation of git in your local machine. Please clone this git repository and keep in mind all commands assume they are run in the project's main directory. Custom Docker Runtime as CDE Resource By default, CDE workloads rely on CDE Resources to mount dependencies into a Job container. CDE Resources can be of three types" Files, Python Environment, and Custom Docker Runtime. In our case we will create a Custom Docker Runtime to ensure Sedona and all required packages are correctly installed. Please open the Dockerfile located in the CDE_Runtime folder to explore more details. The below steps have already been executed and the image is available for reuse in my public Docker registry. If you want, create your own image by replacing my Docker username and running the following commands from your terminal: docker build --network=host -t pauldefusco/dex-spark-runtime-3.2.3-7.2.15.8:1.20.0-b15-sedona-geospatial-002 . -f Dockerfile
docker run -it docker build --network=host -t pauldefusco/dex-spark-runtime-3.2.3-7.2.15.8:1.20.0-b15-sedona-geospatial-003 . -f Dockerfile /bin/bash
docker push pauldefusco/dex-spark-runtime-3.2.3-7.2.15.8:1.20.0-b15-sedona-geospatial-003 Next, create CDE Resource Credentials and the CDE Custom Docker Runtime resource: cde credential create --name docker-creds-pauldefusco --type docker-basic --docker-server hub.docker.com --docker-username pauldefusco cde resource create --name dex-spark-runtime-sedona-geospatial-pauldefusco --image pauldefusco/dex-spark-runtime-3.2.3-7.2.15.8:1.20.0-b15-sedona-geospatial-003 --image-engine spark3 --type custom-runtime-image CDE FIles Resources for Datasets Sedona is compatible with many geospatial file formats. In our example we will load geospatial countries data to a CDE Files Resource so it can be read by the CDE Spark Job. The "cde resource upload-archive" command allows you to bulk upload and extract a zipped folder: cde resource create --name countries_data cde resource upload-archive --name countries_data --local-path data/ne_50m_admin_0_countries_lakes.zip CDE Files Resources for Job Code Next, create another CDE Files Resource to store your code and dependencies in your Virtual Cluster for reuse. Before running the below command, open properties.conf and replace your Cloud Default Storage location and CDP Workload Username. If you need help finding these please contact your CDP Administrator. cde resource create --name job_code cde resource upload --name job_code --local-path code/geospatial_joins.py --local-path code/geospatial_rdd.py --local-path code/parameters.conf --local-path code/utils.py Geospatial RDD Spark Job in CDE The Sedona RDD allows you to store geospatial data with custom data types in a Spark RDD. You can then use these for performing geospatial queries. In this section we will create a CDE Spark Job to manipulate Sedona RDD's. First create the CDE Spark Job with the following command. Notice that the CDE CLI allows you to specify multiple CDE Files Resources along with a prefix. The prefix can be used within the Python script to reference data or files in one or more CDE Files Resources. In this CLI command we reference the job_code and countries_data Files Resources to respectively locate the script file and access the data in geospatial format: Also notice that the CDE Custom Docker Runtime is referenced with the "runtime-image-resource-name" parameter. Finally, at time of this writing Sedona 1.5 is the latest version and, similarly to a spark-submit, we use the packages parameter to load the Maven packages into our CDE Spark Job. cde job create --name geospatial_rdd --type spark --mount-1-prefix jobCode/ --mount-1-resource job_code --mount-2-prefix countriesData/ --mount-2-resource countries_data --runtime-image-resource-name dex-spark-runtime-sedona-geospatial-pauldefusco --packages org.apache.sedona:sedona-spark-shaded-3.0_2.12:1.5.0,org.datasyslab:geotools-wrapper:1.5.0-28.2 --application-file jobCode/geospatial_rdd.py Next, open "geospatial_rdd.py" in your editor. Familiarize yourself with the code and notice the following: The SparkSession is created and then passed to the SedonaContext object. From then on, all Spark SQL queries are running in the SedonaContext. spark = SparkSession \ .builder \ .appName("IOT DEVICES LOAD") \ .config("spark.kubernetes.access.hadoopFileSystems", data_lake_name)\ .getOrCreate() config = SedonaContext.builder().getOrCreate() sedona = SedonaContext.create(spark) sc = sedona.sparkContext sc.setSystemProperty("sedona.global.charset", "utf8") Data is read from the CDE Files Resource using the CDE CLI Alias. A SpatialRDD is created with it: countries_rdd = ShapefileReader.readToGeometryRDD(sc, "/app/mount/countriesData") Sedona allows you to store data in Cloud Storage in GeoJSON format: countries_rdd.saveAsGeoJSON(data_lake_name + geoparquetoutputlocation + "/countries.json") A Spark Dataframe can be created from a SpatialRDD using the Sedona Adapter: countries_df = Adapter.toDf(countries_rdd, sedona) You can generate Geospatial POINT data with your favorite util as shown in util.py. The data can then be transformed into Sedona POINT data using Sedona SQL: dg = DataGen(spark, username) iot_points_df = dg.iot_points_gen(row_count = 100000, unique_vals=100000) iot_points_df.createOrReplaceTempView("iot_geo_tmp") iot_points_geo_df = sedona.sql("SELECT id, device_id, manufacturer, event_type, event_ts, \ ST_Point(CAST(iot_geo_tmp.latitude as Decimal(24,20)), \ CAST(iot_geo_tmp.longitude as Decimal(24,20))) as arealandmark \ FROM iot_geo_tmp") Sedona allows you to spatially partition and index your data: iotSpatialRDD.spatialPartitioning(GridType.KDBTREE) We can use the SpatialRDD to run geospatial queries on our data; for example, we calculate the distance between each POINT object and a predefined point in our query e.g. 52-21: iotSpatialRDD.rawSpatialRDD.map(lambda x: x.geom.distance(Point(21, 52))).take(5) Finally, we run a KNN query on the same data to obtain the k number of POINT objects that lay closes to the provided POINT object: result = KNNQuery.SpatialKnnQuery(iotSpatialRDD, Point(-84.01, 34.01), 5, False) Run the Job with the CLI: cde job run --name geospatial_rdd --executor-cores 2 --executor-memory "4g" Geospatial Join Spark Job in CDE Similarly to above, create a "geospatial_joins" CDE Spark Job using the CLI: cde job create --name geospatial_joins --application-file jobCode/geospatial_joins.py --type spark --mount-1-prefix jobCode/ --mount-1-resource job_code --mount-2-prefix countriesData/ --mount-2-resource countries_data --runtime-image-resource-name dex-spark-runtime-sedona-geospatial-pauldefusco --packages org.apache.sedona:sedona-spark-shaded-3.0_2.12:1.5.0,org.datasyslab:geotools-wrapper:1.5.0-28.2 Then open "geospatial_joins.py"; notice that the synthetic IOT data is joined with the countries data loaded from a CDE Files resource. The data is loaded from Cloud Storage in GeoJSON format: from sedona.core.formatMapper import GeoJsonReader geo_json_file_location = data_lake_name + geoparquetoutputlocation + "/iot_spatial.json" saved_rdd_iot = GeoJsonReader.readToGeometryRDD(sc, geo_json_file_location) Using Sedona SQL the data is joined using the "ST_Contains" spatial predicate. Sedona allows us to join the two tables by matching POINT objects from the IOT dataset located within the boundaries of the POLYGON objects used to define country shapes in the Countries dataset: GEOSPATIAL_JOIN = """ SELECT c.geometry as country_geom, c.NAME_EN, a.geometry as iot_device_location, a.device_id FROM COUNTRIES_{0} c, IOT_GEO_DEVICES_{0} a WHERE ST_Contains(c.geometry, a.geometry) """.format(username) result = sedona.sql(GEOSPATIAL_JOIN) Run the Job with the CLI: cde job run --name geospatial_joins --executor-cores 2 --executor-memory "4g" Summary This article showcased a simple geospatial use case with Apache Sedona. You can easily run Sedona Spark jobs in Cloudera Data Engineering to run Geospatial Spark Jobs at Scale.
... View more
10-20-2023
07:01 PM
1 Kudo
What is Cloudera Data Engineering CDP Data Engineering (CDE) is the only cloud-native service purpose-built for enterprise data engineering teams. Building on Apache Spark, Data Engineering is an all-inclusive data engineering toolset that enables orchestration automation with Apache Airflow, advanced pipeline monitoring, visual troubleshooting, and comprehensive management tools to streamline ETL processes across enterprise analytics teams. Typically CDE users leverage the CLI and API to accomplish tasks such as creating, running and orchestrating Spark and Airflow Jobs, uploading files such as Python wheel files, Jars, etc to the Virtual Cluster, and creating reusable Python environments for Spark and Airflow Jobs. CDEPY is a package that allows you to do all the above with the convenience of Python. With it you can remotely connect to a Virtual Cluster from your local machine or 3rd party tool as long as it supports Python. It is available on PyPi at this URL and can be easily installed with a "pip install cdepy" command. At the time of this writing the latest is version 0.1.5. In the rest of this article we will walk through some basic steps for getting started with the package. Package Imports from cdepy import cdeconnection
from cdepy import cdejob
from cdepy import cdemanager
from cdepy import cderesource CDECONNECTION Module The cdeconnection module allows you to establish a connection to a CDE Virtual Cluster. To instantiate a connection object you will need a JOBS_API_URL (available in the CDE Virtual Cluster Service Details page); and your CDP Workload User and Password. JOBS_API_URL = "https://<YOUR-CLUSTER>.cloudera.site/dex/api/v1"
WORKLOAD_USER = "<Your-CDP-Workload-User>"
WORKLOAD_PASSWORD = "<Your-CDP-Workload-Password>" myCdeConnection = cdeconnection.CdeConnection(JOBS_API_URL, WORKLOAD_USER, WORKLOAD_PASSWORD) Next you will need to obtain a Token to connect to the cluster. This can be achieved with the following method. Once this is run, the Token is applied to the Connection object. myCdeConnection.setToken() CDERESOURCE Module The cderesource module allows you to create CDE Resources. If you are new to CDE, a CDE Resource can be of type "Files", "Python", or "Custom Docker Runtime". Mirroring this, the cderesource module allows you to create two types of objects: CdeFilesResource and CdePythonResource (support for Custom Docker Resources will be available in a future release). CDE_RESOURCE_NAME = "myFilesCdeResource"
myCdeFilesResource = cderesource.CdeFilesResource(CDE_RESOURCE_NAME)
myCdeFilesResourceDefinition = myCdeFilesResource.createResourceDefinition() In both cases the object is a just definition for the CDE Resource and the actual Resource will be created with the cdemanager module. Before we get there though, we have to create a definition for a CDE Job. CDEJOB Module The cdejob module allows you to create Job Definitions of type Spark and Airflow. The definition includes important parameters such as Spark Resources Configurations (e.g. driver/executor memory and driver/executor cores) and Airflow DAG configurations (e.g. DAG schedule interval). Below we create a definition for a CDE Job of type Spark. The job in the CDE Jobs UI will be named myCdeSparkJob using the local pysparksql.py script. CDE_JOB_NAME = "myCdeSparkJob"
APPLICATION_FILE_NAME = "pysparksql.py"
myCdeSparkJob = cdejob.CdeSparkJob(myCdeConnection)
myCdeSparkJobDefinition = myCdeSparkJob.createJobDefinition(CDE_JOB_NAME, CDE_RESOURCE_NAME, APPLICATION_FILE_NAME) Just like in the case of the Resource Definition object, the job has not yet been created in CDE. Next, we will use the cdemanager module to create the Resource and Job in CDE. CDEMANAGER Module First, the CdeClusterManager object is instantiated with the CdeConnection object: myCdeClusterManager = cdemanager.CdeClusterManager(myCdeConnection) Next, the Resource is created with the CdeFilesResourceDefinition object we created above. myCdeClusterManager.createResource(myCdeFilesResourceDefinition) Now that the Resource has actually been created in CDE we can upload our PySpark script to it: LOCAL_FILE_PATH = "examples"
LOCAL_FILE_NAME = "pysparksql.py" myCdeClusterManager.uploadFile(CDE_RESOURCE_NAME, LOCAL_FILE_PATH, LOCAL_FILE_NAME) Finally we are now ready to create the CDE Spark Job. The Spark Job will recognize the CDE Files Resource we just created because we set the name of the Resource in the CdeSparkJobDefinition object earlier. myCdeClusterManager.createJob(myCdeSparkJobDefinition) The CdeClusterManager object allows you to run a Job and monitor its results. myCdeClusterManager.runJob(CDE_JOB_NAME)
myCdeClusterManager.listJobRuns() SUMMARY In the above workflow we used CDEPY and Python to create a CDE Files Resource and Spark Job, run it, and finally monitor its results. Generally, if you use Spark and/or Airflow in Cloudera Data Engineering you can leverage its API with Python to create your own framework. This allows you to integrate your existing Spark and Airflow applications with CDE and/or build Cluster and Job observability applications based on your business requirements.
... View more
09-07-2023
12:06 PM
Pandas is a fast, powerful and flexible Python data analysis and manipulation tool. It has gained incredible popularity in the Data Science community as one of the most important packages for data wrangling and preparation. With the Pandas DataFrame at the core of its API, it is very simple to use and performant on relatively small datasets. On the other hand, Apache Spark is an open-source, distributed processing system used for big data workloads. It has also gained extreme popularity as the go-to engine for prototyping and deploying Data Engineering and Machine Learning pipelines at scale. Due to the maturity and reliability that the Spark project has achieved in recent years it is widely used for production use cases in the enterprise. Like Pandas, Spark features an API with the DataFrame as a foundational data structure for analyzing data at scale. However, while it is dramatically more performant at scale than Pandas it has never been quite as easy and intuitive to use. The Koalas project was created to address this gap. The idea was simple: provide users with the ability to run their existing Pandas code on Spark. With Spark 3.2 the project has been fully incorporated in PySpark as the "Pandas API on Spark". Cloudera Machine Learning (CML) is Cloudera’s new cloud-native machine learning service, built for CDP. The CML service provisions clusters, also known as ML workspaces, that run natively on Kubernetes. ML workspaces support fully-containerized execution of Python, R, Scala, and Spark workloads through flexible and extensible engines. The Pandas on Spark API is a common choice among CML users. The attached notebook provides a simple Quickstart for a CML Session. To use it in CML you must have: A CML Session with the Python Kernel and the Spark Add on enabled (Spark Version 3.2 or above only). Pandas and PyArrow installed. The notebook in this Git repository contains the full CML Quickstart. Here are some important recommendations: Even though Pandas on Spark does not require a SparkSession or SparkContext object, use the CML Spark Data Connection to launch a SparkSession object and set custom configurations. For example, this will allow you to read Iceberg tables into a Pandas On Spark DataFrame with a single line of code. import cml.data_v1 as cmldata
CONNECTION_NAME = "go01-aw-dl"
conn = cmldata.get_connection(CONNECTION_NAME)
spark = conn.get_spark_session()
df_from_sql = ps.read_table('default.car_sales') When you load a table from the Catalog set the 'compute.default_index_type' option to 'distributed' to ensure your data is not folded into a single Spark partition: ps.set_option('compute.default_index_type', 'distributed')
df_from_sql_distributed = ps.read_table('default.car_sales') The same Spark performance concepts such as trying to avoid shuffles apply to Pandas on Spark. For example you should leverage checkpointing when utilizing the same DataFrame repeatedly, shuffle with caution, and review the Spark Plan when needed: df_from_sql_distributed.spark.explain()
== Physical Plan ==
*(1) Project [monotonically_increasing_id() AS __index_level_0__#1002L, customer_id#992L, model#993, saleprice#994, sale_date#995, vin#996]
+- *(1) Scan HiveAcidRelation(org.apache.spark.sql.SparkSession@46e4ace2,default.car_sales,Map(transactional -> true, numFilesErasureCoded -> 0, bucketing_version -> 2, transient_lastDdlTime -> 1679572954, serialization.format -> 1, transactional_properties -> insert_only, table -> default.car_sales)) [customer_id#992L,model#993,saleprice#994,sale_date#995,vin#996] PushedAggregates: [], PushedFilters: [], PushedGroupby: [], ReadSchema: struct<customer_id:bigint,model:string,saleprice:double,sale_date:string,vin:string> df_from_sql_distributed = df_from_sql_distributed.spark.local_checkpoint()
df_from_sql_distributed.spark.explain()
== Physical Plan ==
*(1) Project [__index_level_0__#1002L, customer_id#992L, model#993, saleprice#994, sale_date#995, vin#996]
+- *(1) Scan ExistingRDD[__index_level_0__#1002L,customer_id#992L,model#993,saleprice#994,sale_date#995,vin#996,__natural_order__#1009L] Plotting with a PySpark DataFrame can be a challenge and often requires collecting into a Pandas DataFrame. With Pandas On Spark you can call plotly's "plot()" method directly on a DataFrame: psdf[['Fee','Discount']].plot.area() In summary, the Pandas API on Spark offers the following benefits: You can run your Pandas code faster. You can use the Pandas API with the distributed horsepower of Spark. You can have a single codebase for everything: small data and big data. The same Pandas syntax is compatible with Dask so you can even more easily choose the engine to run it on. When deployed on CML, the Pandas on Spark API is easy to use: CML Sessions allow you to easily deploy resources in Kubernetes to execute ML workloads. CML Runtimes allow you to switch between programming languages, editors, and preloaded libraries with agility. Among these benefits, you can pick which version of Spark to use on the fly. In this example we used Spark 3.2.
... View more
08-30-2023
05:07 PM
Objective This article provides a Quickstart for using XGBoost with Spark in Cloudera Machine Learning. CML users who are looking to test a distributed version of XGBoost are welcome to use the code in their environments. The notebook is available in this Git repository. Cloudera Machine Learning Cloudera Machine Learning (CML) on the Cloudera Data Platform increases data scientists' productivity by providing a single, unified platform that is all-inclusive for powering any AI use case. CML is purpose-built for agile experimentation and production ML workflows. It is designed to manage everything from data preparation to MLOps and predictive reporting. For more information on CML, please visit this page. XGBoost and Spark In Machine Learning, Decision Tree methods are a type of Supervised Learning model. They have been used for decades in both classification and regression tasks. There are many types; generally, they are constructed by identifying ways to split data into hierarchical structures. Data is split into leaf nodes from the top of the tree by using features in order of predictive importance. Decision trees are highly interpretable but are known to suffer from a tendency to overfit the data. Ensemble methods combine multiple learners to increase model performance. Anyone can create one by combining existing models for a given task but typically they are applied to Decision Trees. Ensemble methods such as Random Forests, bagging, and Gradient Boosting iteratively reward good base learners to ultimately yield models with better accuracy and lower variance. XGBoost stands for Extreme Gradient Boosting and is a scalable, distributed gradient-boosted decision tree (GBDT) machine learning library. It is very popular among data scientists because of its consistent results across many Kaggle competitions and research projects in academia. Apache Spark is a powerful open-source engine for big data processing and analytics. Combining XGBoost and Spark allows you to leverage the model performance gains provided by the former while distributing the work to the latter. This can dramatically improve the quality and performance of your Machine Learning models. Using the example in CML Before launching the notebook in a CML Session we recommend ensuring that the “Enable CPU Bursting” workspace option is set in the “Site Administration” -> “Runtime” page (only CML Admins have access to it). This configuration controls if CML pod specifications for CML sessions receive a resource limit. In other words, when enabled the Spark Executor Cores and Memory are not limited by the CML Session Resource Profile. Next, launch a CML Session with the following Options and Resource Profiles: Editor: JupyterLab
Kernel: Python 3.8 or above
Edition: Standard
Version: any version ok
Enable Spark: Spark 3.2.0 and above ok
Resource Profile: 2vCPU / 4GiB Memory - 0 GPU Then open the terminal and install the project requirements with: pip3 install -r requirements.txt In the notebook ensure to update the SparkSession properties according to your CDP Environment in the second cell. If your CML Workspace is in AWS you will have to set both "spark.hadoop.fs.s3a.s3guard.ddb.region" and "spark.kerberos.access.hadoopFileSystems". If your CML Workspace is in Azure, OCP, or Cloudera ECS you only need to set "spark.kerberos.access.hadoopFileSystems" and can remove the other property. The correct values for both options are available in the CDP Management Console under Environment Configurations. If you have issues finding these please contact your CDP Administrator. No other changes are required. You can select “Run All Cells” from the “Run” option in the JupyterLab menu and follow along as the code outputs are populated in each cell. Code Walk-Through This section will highlight the most important code in the notebook. Cell 1: The "SparkXGBClassifier" class is imported from the "xgboost.spark" module. XGBoost also provides a "SparkXGBRegressor" class for Regression tasks from pyspark.sql import SparkSession
from xgboost.spark import SparkXGBClassifier
from pyspark.ml.linalg import Vectors Cell 2: The SparkSession object is created. The "xgboost.spark" module requires disabling Spark Dynamic Allocation. Therefore we set four Executors with basic Memory and Core configurations. spark = SparkSession\
.builder\
.appName("SparkXGBoostClassifier Example")\
.config("spark.hadoop.fs.s3a.s3guard.ddb.region", "us-east-2")\
.config("spark.kerberos.access.hadoopFileSystems","s3a://go01-demo")\
.config("spark.dynamic allocation.enabled", "false")\
.config("spark.executor.cores", "4")\
.config("spark.executor.memory", "4g")\
.config("spark.executor.instances", "2")\
.config("spark.driver.core","4")\
.config("spark.driver.memory","4g")\
.getOrCreate() Cell 3: The code to reach the Spark UI in CML. Uncomment and run the cell and open the URL provided in the output to follow along in the Spark UI. import os
print("https://spark-"+os.environ["CDSW_ENGINE_ID"]+"."+os.environ["CDSW_DOMAIN"]) Cell 4 and 6: Two basic Spark Dataframes are created as training and test data. df_train = spark.createDataFrame([
(Vectors.dense(1.0, 2.0, 3.0), 0, False, 1.0),
(Vectors.sparse(3, {1: 1.0, 2: 5.5}), 1, False, 2.0),
(Vectors.dense(4.0, 5.0, 6.0), 0, True, 1.0),
(Vectors.sparse(3, {1: 6.0, 2: 7.5}), 1, True, 2.0),
], ["features", "label", "isVal", "weight"])
df_test = spark.createDataFrame([
(Vectors.dense(1.0, 2.0, 3.0), ),
], ["features"]) Cell 7: An object of the "SparkXGBClassifier" class is instantiated with default hyperparameters. Notice the "num_workers" option is set to 2. This value should be set to the number of Executors you want to distribute your SparkXGBoost Application across. xgb_classifier = SparkXGBClassifier(max_depth=5, missing=0.0,
validation_indicator_col='isVal', weight_col='weight',
early_stopping_rounds=1, eval_metric='logloss', num_workers=2) Cell 8: The classifier is trained on the training data xgb_clf_model = xgb_classifier.fit(df_train) Cell 9: The classifier is used to run inference on the test data. xgb_clf_model.transform(df_test).show() Classifier Sample Prediction on Test Data: +-------------+-------------+----------+-----------+
| features|rawPrediction|prediction|probability|
+-------------+-------------+----------+-----------+
|[1.0,2.0,3.0]| [-0.0,0.0]| 0.0| [0.5,0.5]|
+-------------+-------------+----------+-----------+ Summary and Next Steps This basic example provided a Quickstart for Distributed XGBoost with PySpark in Cloudera Machine Learning. Integrating XGBoost and Spark provides data scientists with the opportunity to leverage the strengths of both. XGBoost is used to achieve high model performance. Spark is used to distribute advanced Machine Learning algorithms across large clusters of worker nodes. If you use CML you may also benefit from the following projects: Telco Churn Demo: Build an End-to-end ML Project in CML and increase ML Explainability with the LIME library. Learn how to use Cloudera Applied ML Prototypes to discover more projects using MLFlow, Streamlit, Tensorflow, PyTorch, and many more. CSA2CML: Build a real-time anomaly detection dashboard with Flink, CML, and Streamlit. SDX2CDE: Explore ML Governance and Security features in SDX to increase legal compliance and enhance ML Ops best practices. APIv2: Familiarize yourself with APIv2, CML's go-to Python library for ML Ops and DevOps.
... View more
06-05-2023
08:45 PM
1 Kudo
The Cloudera Data Platform (CDP) is a hybrid data platform designed to deliver faster and easier data management, analytics and AI at enterprise scale. Cloudera Machine Learning (CML) is one of CDP’s Cloud Native Data Services designed to enable secure, governed Data Science.
With immediate access to enterprise data pipelines, scalable compute resources, and access to preferred tools, Data Scientists and Engineers use CML to streamline the process of getting analytic workloads into production and intelligently manage machine learning use cases and MLOps processes.
While CML Data Scientists spend most of their time prototyping and productionizing models in CML Projects, the CML Admin needs to be familiar with the CML Workspace, its basic architecture and how it allocates Cloud resources.
With this article we will share some foundational concepts in order to help CML Administrators better understand how to size Workspaces.
What is a Workspace?
CML workloads are executed within Workspaces and in turn within Projects and Teams. To the CML User, the Workspace is a high-level construct to create CML Projects, store CML Runtimes, and perform other administrative tasks such as creating Resource Profiles.
However, under the covers, the Workspace is better defined as an auto-scaling service for Data Science leveraging Kubernetes. The Kubernetes cluster runs in Cloud Virtual Machines using AKS and EKS in the Public Cloud or OCP and Cloudera ECS in the Private Cloud. The CML Administrator or Data Scientist is not required to know or handle Kubernetes in any way. CML automatically deploys and manages the infrastructure resources for you in your CDP Environment of choice.
When a Workspace is created for the first time a node is deployed to the underlying infrastructure. This is a fixed resource that is required to run at all times for a small cost.
Subsequently, when a CML User runs a workload such as a Notebook, a Model API Endpoint, or a Batch Job, the CML Workspace provisions the necessary Pod(s) thus requesting a second node from the underlying infrastructure.
As mentioned above, the auto-scaling process is fully automated and does not require any supervision. Auto-scaling events are fast and designed so that CML Users are not aware of them. Running workloads are not affected by the auto-scaling event e.g. running Sessions will continue undisturbed. If needed, any pending workloads such as new CML Sessions or previously scheduled CML Jobs will be queued automatically until new resources are deployed.
At a high level, the pods carve out resources from the node(s) which is then released when the workload is complete. Thus, the CML Customer is only charged on the go as cloud resources are consumed and then discarded.
The CML User explicitly picks the amount of CPU, Memory and optionally GPU resources when launching the workload. This amount is called a Resource Profile (e.g. 1 CPU / 2 GiB Mem) and it is predefined by the CML Admin at the Workspace level in order to provide an approval process and prevent Data Scientists from consuming too many resources without control.
Sizing Considerations
When deploying the Workspace for the first time, the user is prompted to select an instance type and an Autoscale Range (see image below). In the Public Cloud, these are AWS or Azure instances. The Autoscale Range is simply a min and max boundary of the instances that can be deployed by the Service.
Typically, the more CPU, Memory, and GPU resources available per instance, the higher the hourly cost to run them but the more CML workloads can be deployed per instance without requiring the autoscaler to deploy an additional node.
Because a typical workload such as a Data Exploration Notebook only requires a small Resource Profile, it is not uncommon to have multiple users working concurrently within the same node and thus at a fairly limited hourly cost. This means that instance types of relatively small size can be chosen when deploying a workspace. In the event of more horsepower being required, the Workspace will simply autoscale by adding as many instances as required and allowed by the Workspace Autoscale Range.
However, if you plan on running workloads that cannot horizontally scale in a distributed fashion with frameworks such as Spark, TensorFlow, etc., then it may make sense to choose a more powerful instance type. This could be the case in Time Series Machine Learning where algorithms cannot always be distributed.
Finally, it’s important to note that CML Instance Types and autoscale ranges can be changed even after a Workspace has been deployed.
Cost Management Considerations
Instance hourly rates are publicly available on the Cloudera Pricing Site. In addition, your Cloudera Account Team can provide additional recommendations to plan and size your Workspace according to your use cases.
CML is designed to allow the Administrator to closely monitor and limit usage in order to prevent runaway cloud charges. As mentioned above, Resource Profiles are whitelisted by the CML Admin in order to prevent CML Users from requesting resources without supervision. To be specific, the CML User will only be able to launch Jobs, Sessions, Applications, etc. with the CPU/Mem/GPU profiles designated in the Runtime menu as shown below.
Furthermore, CML Users are also users at the CDP Environment level. In other words, each Workspace can grant or deny access to a particular CDP User.
Finally, within each Workspace, the CML Admin can create Quotas to directly limit a User’s maximum amount of CPU, Memory, and GPU use across all workloads at any given time. Quota consumption is only a subset of the Workspace Autoscale ranges which can be viewed as a second option for managing costs at the global level.
Using Multiple Workspaces
It is common practice to create multiple CML Workspaces as each additional Workspace can provide workload isolation and a quick second option in case of failure. CML Customers typically deploy them based on scope such as Use Case, Business Organization, or function e.g. DEV vs QA vs PROD.
The additional workspace(s) can be created in the same CDP Environment or in a separate CDP Environment. In the former case, the Workspaces will share the same SDX Data Lake and thus their users will be able to access and transform the same datasets while being governed and secured by the same Atlas and Ranger services. In the latter case, creating Workspaces in different CDP Environments will guarantee that they won’t be adversely affected in case of a failure at the CDP Environment level.
For example, the below image shows two workspaces deployed in the same CDP Environment while a third one is in a separate one. Notice the first Workspace is undergoing a change in instance types and autoscale range.
Additionally, CML supports MLFlow Registry which allows you to deploy models from one Workspace to another. As a result, multiple workspaces can support DevOps pipelines across multiple CDP Environments and even allow you to deploy models from Public to Private Cloud and vice versa (Hybrid Machine Learning).
Although each Workspace comes with a small fixed hourly charge, another advantage is that you will be able to select different instance types and autoscale ranges for each deployment which in turn could allow you to save money by enforcing stricter limitations on particular business functions or user groups.
A Sizing Exercise Example
With all these considerations in mind, we recommend you go through a similar exercise as below when planning your Workspace deployment.
Step 1: Estimate the number of CML Users and optionally whether these will be working within the same or different Teams, Use Cases, and CDP Data Lakes.
Step 2: Estimate average and peak CPU, Memory, and optionally GPU consumption per User. If planning on more than one Team, determine if the average and peak dramatically varies between them.
Step 3: Decide if you need more than one workspace. Try to group users into Teams and Use Cases as much as reasonably possible based on similarities in Data Lake Access, average and peak consumption. Other factors may include whether users need GPUs, special Ranger ACLs, and types of workloads (e.g. primarily hosting API Model Endpoints vs Exploratory Data Science in Notebooks vs Spark ETL in CML Jobs).
Step 4: Sum up all CPU, Memory, and GPU required per workspace at peak and average, then add 20%.
Step 5: Look up CPU, Memory, and GPU resources per AWS or Azure Instance types and estimate how many instances would be required to fit the sum from Step 4. Pick an Instance Type that will fit most of your average workloads with a reasonable instance count (i.e. within the 3-6 range) and your peak workloads with no more than 10 instances. If this is not possible, divide the workload further into two separate workspaces where one has the same or smaller instance types and the other has larger instance types.
Conclusions
In this article, we highlighted some of the most fundamental considerations for sizing a CML Workspace. In summary:
CML Workspaces are autoscaling Kubernetes clusters providing Workload Isolation. CML automatically deploys and manages the infrastructure resources for you and requires no knowledge or interaction with the Kubernetes resources under the hood.
When planning for the deployment of Workspaces it is important to keep in mind that multiple Workspaces can and should be deployed based on Use Case, Team, Function, and Resource Consumption estimates.
Generally, sizing a Workspace consists of an exercise of estimating average and peak consumption in terms of CML Resource Profiles and mapping the estimates to AWS or Azure Instance Types. Additional considerations such as workload type, Data Lake access and SLAs should be prioritized as decision factors.
... View more
06-05-2023
06:26 PM
In the last few years, Spark has become one of the default choices for Data Analysis, Engineering, and Machine Learning in Cloudera Machine Learning (CML). CML is the Cloud-Native autoscaling service in the Cloudera Data Platform (CDP) for secure and governed Machine Learning and interactive Data Analysis at Enterprise Scale. In this article, we will share important considerations when using Spark in CML in order to help Developers and Data Scientists in their daily usage. CML Sessions A CML Workspace provides an autoscaling Kubernetes cluster for running workloads such as interactive notebook Sessions, as well as Batch Jobs, Experiments, Applications, and Models. These are available in the CML Project, a construct to isolate users in their own environments. Workspace implementation details are outside of this article’s scope. Luckily neither the CML User nor Admin is required to know these or operate Kubernetes at all. Under the covers, CDP provides the necessary infrastructure and integrations with the Shared Data Experience so that the Workspace is as easy to deploy as the click of a button. When the CML User launches Sessions, Jobs, or other types of workloads, the user is prompted to select a Resource Profile, a Runtime, and an Editor. The Resource Profile is the amount of CPU, Memory, and optionally GPU resources that are assigned to the Session when it is launched. The Runtime consists of all the dependencies (.e.g Pandas, Numpy, Tensorflow, etc.) that are loaded into the Session Pod by default. Finally, the user can select from predefined editors such as RStudio, JupyterLabs, and VS Code, or optionally select a custom editor if one has been installed. As an alternative, the user can deploy a local IDE such as IntelliJ or VSCode and connect to CML remotely. CML Sessions and Spark CML beginners often ask how CML Sessions relate to Spark Sessions and Applications. The two are fundamentally different concepts. As we saw in the previous section when a CML Session is launched a Pod is deployed with the allotted Resource Profile. Due to CML’s flexibility, Sessions can execute workloads with any open-source framework in languages such as Scala, Python, R, Julia, or Java. This is due to the fact that Sessions rely on CML Runtimes which can be thought of as a virtual machine customized to have all the necessary dependencies to access the computing cluster while keeping each project’s environment entirely isolated. Under the hood, CML deploys a container in the Kubernetes cluster with the assigned Resource Profile CPU-Memory and mounts Runtime dependencies. Spark is just one of the different frameworks that one could use within a Session. For example, a CML User could create a Spark Application from his or her Session Notebook. In this event, CML will deploy the Spark Application in Client Mode thus requesting additional Pods to run Spark Executors in from CML and the Kubernetes cluster. Picking a Resource Profile It’s very important to notice that only the Spark Driver will run within the Session while the Spark Executors will have their own resources. This implies that there is no need to launch a Session with a large Resource Profile when trying to run Spark Applications at scale. The Session Resource Profile should be picked primarily on the basis of the Spark Driver’s necessary profile. If the user wishes to deploy more than one Spark Application from the same CML Session, these will be managed by the same Spark Driver. There is at most only one Spark Driver per Session. Overridden Spark Configurations It’s important to note that most Spark Configurations are left untouched by CML. The Spark on Kubernetes public documentation should thus serve as a primary reference when setting Spark Session Configs. However, the following Spark options are overridden by CML and should never be modified by the Spark Developer: spark.driver.host spark.driver.port spark.blockmanager.port So in general what important Spark Configurations should be set when using Spark in CML? Spark Dynamic Allocation is enabled by default in CML. We recommend using it for exploring datasets in interactive sessions or running large scale Data Engineering Spark jobs while keeping an eye on the “spark.dynamicAllocation.maxExecutors” property in order to prevent runaway charges. The "spark.executor.memory", "spark.executor.instances", "spark.executor.cores", "spark.driver.memory" properties are also very important as they allow you to predefine the amount of resources allocated to your Spark jobs. However, unlike Spark on Yarn clusters, CML provides dramatically more flexibility when it comes to deploying cloud resources. Because Spark clusters in CML are ephemeral and run in isolated environments, you will have less resource contention and generally you won't have to carefully evaluate the impact of your jobs on someone else's workloads. For an introduction to tuning Spark jobs and tuning resource allocation visit this guide. spark = SparkSession\
.builder\
.appName("DistributedSession")\
.config("spark.dynamicAllocation.minExecutors", 1)\
.config("spark.dynamicAllocation.maxExecutors", 4)\
.config("spark.executor.memory","2g")\
.config("spark.executor.cores","8")\
.config("spark.driver.memory","2g")\
.getOrCreate() To access data in Cloud Storage you should set the “spark.yarn.access.hadoopFileSystems” config to your Storage location. This value is shown in the CDP Management Console Summary tab of the CDP Environment tied to your CML Workspace. Notice that the "spark.hadoop.fs.s3a.s3guard.ddb.region" option is no longer necessary in recent CML versions. spark = SparkSession\
.builder\
.appName("PythonSQL")\
.config("spark.yarn.access.hadoopFileSystems", "s3a://gd-uat/")\
.getOrCreate() Spark in CML runs in Client mode by default. Generally local mode is not recommended unless you prototyping a small use case in a notebook. spark = SparkSession\
.builder\
.master("local")\
.appName("SimpleSession")\
.getOrCreate() If using Apache Iceberg, you must select an Iceberg compatible Runtime and set "spark.sql.extensions", "spark.sql.catalog.spark_catalog", "spark.sql.catalog.local", "spark.sql.catalog.local.type", "spark.sql.catalog.spark_catalog.type" as shown below: spark = SparkSession\
.builder\
.appName("PythonSQL")\
.config("spark.sql.extensions","org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.config("spark.sql.catalog.spark_catalog","org.apache.iceberg.spark.SparkSessionCatalog") \
.config("spark.sql.catalog.local","org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.local.type","hadoop") \
.config("spark.sql.catalog.spark_catalog.type","hive") \
.getOrCreate() As an alternative to the above, we recommend using Spark Data Connections to launch a Spark Session and directly connect to your Data Lake. If you use an Iceberg compatible Spark Runtime Add-On the Iceberg configs will be set automatically. Notice the Cloud Storage config is not required as it is set by default. import cml.data_v1 as cmldata
from pyspark import SparkContext
#Optional Spark Configs
SparkContext.setSystemProperty('spark.executor.cores', '4')
SparkContext.setSystemProperty('spark.executor.memory', '8g')
#Boilerplate Code provided to you by CML Data Connections
CONNECTION_NAME = "go01-dl"
conn = cmldata.get_connection(CONNECTION_NAME)
spark = conn.get_spark_session()
# Sample usage to run query through spark
EXAMPLE_SQL_QUERY = "show databases"
spark.sql(EXAMPLE_SQL_QUERY).show() Using Different Spark Versions When running distributed workloads at scale in CML Spark is generally easier to use than others. First of all, other frameworks run in a single Session and require using the Workers API to distribute runs across multiple nodes. Instead, Spark on Kubernetes runs in client mode and deploys as many Executor containers as needed by default. In addition, Spark dependencies can be injected into the Session via CML Runtime Add-Ons. In simple terms, this means that you can launch a Session with or without Spark at the click of a button in a form. Behind the scenes, the Spark Add-On bits are loaded on the fly. An additional benefit of this is that the end user can pick between Spark versions when launching the CML Session using a form rather than having to install or customize any project dependencies (or even run multiple sessions with different versions of Spark at the same time). As of time of this writing, CML supports Spark 2.4.8 and 3.2.0. The versions are managed at the Workspace level and depend on the Workspace version. Deploying a SparkML Model as an API Endpoint To support Production Machine Learning CML features “Models” i.e. a Model Management Tool to deploy models in API Endpoints, track requests and responses, and much more. Typically a CML Model deploys a Script in a running Pod and requires loading all necessary model artifacts such as “pkl” or “joblib” files from Local or Cloud Storage. The model is wrapped by an arbitrary function that takes the Request JSON as input, processes it via the model’s “predict” method (or equivalents), and finally returns a Response. While this is fairly straightforward with frameworks such as SciKit-Learn and Dask, SparkML models are more challenging as they require running a SparkSession which does not lend itself well to the latency required by an API Endpoint. To avoid this, a number of projects and formats have been developed such as MLeap, PMML, and more. In CML, we recommend using the mlflow.spark module to create MLFlow Experiments. If the Developer chooses to log Model Artifacts via the MLFlow Tracking API these are stored in each Experiment Run for later access. If the developer then opts to register an Experiment Run to the MLFlow Registry, CML will provide an option to deploy the model from the Registry to the CML API Endpoint without having to craft a script for it. MLFlow will automatically convert the SparkML Model to the pyfunc flavor. As of the time of this writing, MLFlow Registry is in Tech Preview and we recommend reaching out to your Cloudera Account Team if you need help deploying it. As an alternative, we recommend using the ONNX format to serialize the SparkML Model. Once converted, the model can be loaded into the CML Model Pod and wrapped into a function as explained above. This GitHub repository provides an example for doing so. First, convert the model from SparkML to ONNX as shown in this notebook. Then, deploy it to a CML Model with a script similar to this. Conclusions In this article, we highlighted some of the most frequently asked questions about using Spark in CML. In summary: CML allows you to easily deploy small and large Spark Applications via CML Sessions. CML supports Spark on Kubernetes with Spark Versions 2.4.8 and 3.2.0. Spark version support is subject to change and depends on CML Workspace versions. CML Sessions are different from Spark Applications. The Spark Driver runs in the CML Session while the Spark Executors run in separate Kubernetes Pods. Most Spark Configurations in CML are not overridden except for the three above which should never be edited by the CML User. If you want to deploy a SparkML Model to an API Endpoint do not launch a Spark Application in it. Instead, default to MLFlow’s Model Registry to deploy models. If you prefer not to use MLFlow convert the SparkML model to a format such as ONNX.
... View more
03-24-2021
07:20 PM
Hello, Nice tutorial, this library is fast! If anyone is running into java.sql.SQLExceptionPyRaisable: java.sql.SQLException: [Cloudera][ImpalaJDBCDriver](500605) Error occurred while opening a session with the server. No additional detail from the server regarding this error is available. Please ensure that the driver configuration is compatible with the server configuration. This type of error can also occur when the server is too busy to handle the request. Please try again later. I was able to fix it by changing the httpPath parameter in the impala hostname from "icml-data-mart/cdp-proxy-api/impala" to to "cliservice" as follows: "jdbc:impala://"+os.environ["IMPALA_HOST"]+":443/;ssl=1;transportMode=http;httpPath=cliservice;AuthMech=3;" Hope this helps anyone!
... View more
04-15-2020
12:08 PM
1 Kudo
Hi Chinedu, This should help: https://stackoverflow.com/questions/48406304/groupby-and-concat-array-columns-pyspark Thanks, Paul
... View more
- « Previous
- Next »