Community Articles

Find and share helpful community-sourced technical articles.
avatar
Rising Star

Pandas is a fast, powerful and flexible Python data analysis and manipulation tool. It has gained incredible popularity in the Data Science community as one of the most important  packages for data wrangling and preparation. With the Pandas DataFrame at the core of its API, it is very simple to use and performant on relatively small datasets. 

On the other hand, Apache Spark is an open-source, distributed processing system used for big data workloads. It has also gained extreme popularity as the go-to engine for prototyping and deploying Data Engineering and Machine Learning pipelines at scale. Due to the maturity and reliability that the Spark project has achieved in recent years it is widely used for production use cases in the enterprise.

Like Pandas, Spark features an API with the DataFrame as a foundational data structure for analyzing data at scale. However, while it is dramatically more performant at scale than Pandas it has never been quite as easy and intuitive to use. 

The Koalas project was created to address this gap. The idea was simple: provide users with the ability to run their existing Pandas code on Spark. With Spark 3.2 the project has been fully incorporated in PySpark as the "Pandas API on Spark". 

Cloudera Machine Learning (CML) is Cloudera’s new cloud-native machine learning service, built for CDP. The CML service provisions clusters, also known as ML workspaces, that run natively on Kubernetes. ML workspaces support fully-containerized execution of Python, R, Scala, and Spark workloads through flexible and extensible engines.

The Pandas on Spark API is a common choice among CML users. The attached notebook provides a simple Quickstart for a CML Session. To use it in CML you must have:

  • A CML Session with the Python Kernel and the Spark Add on enabled (Spark Version 3.2 or above only).
  • Pandas and PyArrow installed.

The notebook in this Git repository contains the full CML Quickstart. Here are some important recommendations:

  • Even though Pandas on Spark does not require a SparkSession or SparkContext object, use the CML Spark Data Connection to launch a SparkSession object and set custom configurations. For example, this will allow you to read Iceberg tables into a Pandas On Spark DataFrame with a single line of code.

 

import cml.data_v1 as cmldata

CONNECTION_NAME = "go01-aw-dl"
conn = cmldata.get_connection(CONNECTION_NAME)
spark = conn.get_spark_session()

df_from_sql = ps.read_table('default.car_sales')

 

  • When you load a table from the Catalog set the 'compute.default_index_type' option to 'distributed' to ensure your data is not folded into a single Spark partition:

 

ps.set_option('compute.default_index_type', 'distributed')
df_from_sql_distributed = ps.read_table('default.car_sales')

 

  •  The same Spark performance concepts such as trying to avoid shuffles apply to Pandas on Spark. For example you should leverage checkpointing when utilizing the same DataFrame repeatedly, shuffle with caution,  and review the Spark Plan when needed:

 

df_from_sql_distributed.spark.explain()
== Physical Plan ==
*(1) Project [monotonically_increasing_id() AS __index_level_0__#1002L, customer_id#992L, model#993, saleprice#994, sale_date#995, vin#996]
+- *(1) Scan HiveAcidRelation(org.apache.spark.sql.SparkSession@46e4ace2,default.car_sales,Map(transactional -> true, numFilesErasureCoded -> 0, bucketing_version -> 2, transient_lastDdlTime -> 1679572954, serialization.format -> 1, transactional_properties -> insert_only, table -> default.car_sales)) [customer_id#992L,model#993,saleprice#994,sale_date#995,vin#996] PushedAggregates: [], PushedFilters: [], PushedGroupby: [], ReadSchema: struct<customer_id:bigint,model:string,saleprice:double,sale_date:string,vin:string>
df_from_sql_distributed = df_from_sql_distributed.spark.local_checkpoint()

df_from_sql_distributed.spark.explain()
== Physical Plan ==
*(1) Project [__index_level_0__#1002L, customer_id#992L, model#993, saleprice#994, sale_date#995, vin#996]
+- *(1) Scan ExistingRDD[__index_level_0__#1002L,customer_id#992L,model#993,saleprice#994,sale_date#995,vin#996,__natural_order__#1009L]

 

  • Plotting with a PySpark DataFrame can be a challenge and often requires collecting into a Pandas DataFrame. With Pandas On Spark you can call plotly's "plot()" method directly on a DataFrame:

 

psdf[['Fee','Discount']].plot.area()

 

plotly_img.png

 

 

 

 

 

In summary, the Pandas API on Spark offers the following benefits:

  • You can run your Pandas code faster.
  • You can use the Pandas API with the distributed horsepower of Spark.
  • You can have a single codebase for everything: small data and big data.
  • The same Pandas syntax is compatible with Dask so you can even more easily choose the engine to run it on.

When deployed on CML, the Pandas on Spark API is easy to use:

  • CML Sessions allow you to easily deploy resources in Kubernetes to execute ML workloads.
  • CML Runtimes allow you to switch between programming languages, editors, and preloaded libraries with agility.
  • Among these benefits, you can pick which version of Spark to use on the fly. In this example we used Spark 3.2.
1,561 Views
0 Kudos