Support Questions
Find answers, ask questions, and share your expertise

CDP CDE Jobs UI: Providing a custom Python module for PySpark UDFs

New Contributor

Hi. I'm trying to reproduce a typical edge-node submission pattern for PySpark jobs using the CDE Jobs UI. To provide a module with custom Python functions that are declared as UDFs, one can specify:

spark_session.sparkContext.addPyFile("python_utils.py")

or use the --py-files argument with spark-submit: 

spark-submit --py-files python_utils.py pyspark_main.py

 

Pure Python functions can then be imported in this way:

 

import pyspark.sql.functions as F

def add_udf_column(df):

  from python_utils import python_func
  python_udf = F.udf(python_func, StringType())
  df = df.withColumn("udf_column", python_udf(df["src_column"]))
  return df

 

Attempting something similar using the CDE Jobs UI, Spark cannot seem to find the custom module. My settings are:

Application File: pyspark_main.py

Arguments: --py-files python_utils.py

Advanced Options:

Python, Egg, Zip files: Added python_utils.py

 

The error I'm getting is:

ModuleNotFoundError: No module named 'python_utils' 

 

Any thoughts on how I should provide this file? Thanks.

 

Edit: Testing spark-submit on CDH6.2, it seems that the --py-files flag must be placed before the main script. If the flag is placed afterwards, then the job fails with the ModuleNotFoundError as above. 

 

From the CDE logs it looks like the API is placing the flag after the reference to the main script. From driver.stderr.log:

+ CMD=("$SPARK_HOME/bin/spark-submit" --conf "spark.driver.bindAddress=$SPARK_DRIVER_BIND_ADDRESS" --deploy-mode client "$@")
+ '[' -z ']'
+ exec /usr/bin/tini -s -- /opt/spark/bin/spark-submit --conf spark.driver.bindAddress=***.***.**.*** --deploy-mode client --properties-file /opt/spark/conf/spark.properties --class org.apache.spark.deploy.PythonRunner /app/mount/pyspark_main.py '--py-files python_utils.py'

 

1 ACCEPTED SOLUTION

New Contributor

To provide a module with custom Python functions that are declared as UDFs, one must specify:

spark_session.sparkContext.addPyFile("/app/mount/python_utils.py")

 

This file should be included in a resource attached to the job.

 

See this post for further examples:

https://blog.cloudera.com/managing-python-dependencies-for-spark-workloads-in-cloudera-data-engineer...

 

View solution in original post

2 REPLIES 2

Super Collaborator

Greetings @stephen_obrien 

 

Thanks for using Cloudera Community. We see your Team is working with our Support Team for the concerned issue. Based on the Support engagement, We shall update the Post accordingly.

 

Regards, Smarak

New Contributor

To provide a module with custom Python functions that are declared as UDFs, one must specify:

spark_session.sparkContext.addPyFile("/app/mount/python_utils.py")

 

This file should be included in a resource attached to the job.

 

See this post for further examples:

https://blog.cloudera.com/managing-python-dependencies-for-spark-workloads-in-cloudera-data-engineer...

 

; ;