Support Questions

Find answers, ask questions, and share your expertise
Announcements
Check out our newest addition to the community, the Cloudera Data Analytics (CDA) group hub.

Pyspark UDF question

Contributor

Hi All,

I have defined the following function as an UDF in pyspark

def dist(x,y):
    a=words(x,'event')
    b=words(y,'proj_desc')
    a['key']=1
    b['key']=1
    ab=pd.merge(a,b)
    del ab['key']
    ab['lv']=ab.apply(lambda x: lvd(x['event'],x['proj_desc']),axis=1)
    score=float(np.mean(ab.groupby('event')['lv'].max()))
    match=float(len(ab[ab['lv']==1]))
    sim=float(match)/len(a)
    return (x,score,match,sim);

The above function is being executed as below

part_concat = udf(dist, part_output)

part_output_df = gl_part_proj_df.select(part_concat(gl_part_proj_df['part_description'],gl_part_proj_df['proj_desc']).alias("part_output"))

My question is as I am using a pandas dataframe will it still execute in all the worker nodes or will it be brought to the driver?

2 REPLIES 2

@Jayadeep Jayaraman

Yes, Pandas will execute on the nodes within your cluster, but you need to make sure that pandas (and any other libraries) are installed on the nodes.

To accomplish this, Hortonworks and Anaconda have partnered to create Cluster Management Packs: https://www.continuum.io/blog/developer-blog/self-service-open-data-science-custom-anaconda-manageme...

This is the preferred way to manage and ship python (and R) packages within your HDP cluster.

If you want to create a python virtual environment and ship it to the nodes of your cluster, then here's a good article: https://community.hortonworks.com/articles/58418/running-pyspark-with-conda-env.html

You need to zip up the relevant python environment (or packages) and ship it when starting your spark session, such as:

./bin/spark-submit --master yarn-cluster --archives your_python_env_or_packages.zip pyspark_project.py

Contributor

Thanks @Dan Zaratsian I have already installed the Scipy toolkit on all nodes and was able to execute the code, but my confusion was born out of some material I read where it was mentioned that a pandas dataframe can only run on the driver mode when we use toPandas(). Can you clarify this for me? When will it run in driver vs when it gets executed on all the nodes?

Take a Tour of the Community
Don't have an account?
Your experience may be limited. Sign in to explore more.