Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Pyspark UDF question

Highlighted

Pyspark UDF question

Contributor

Hi All,

I have defined the following function as an UDF in pyspark

def dist(x,y):
    a=words(x,'event')
    b=words(y,'proj_desc')
    a['key']=1
    b['key']=1
    ab=pd.merge(a,b)
    del ab['key']
    ab['lv']=ab.apply(lambda x: lvd(x['event'],x['proj_desc']),axis=1)
    score=float(np.mean(ab.groupby('event')['lv'].max()))
    match=float(len(ab[ab['lv']==1]))
    sim=float(match)/len(a)
    return (x,score,match,sim);

The above function is being executed as below

part_concat = udf(dist, part_output)

part_output_df = gl_part_proj_df.select(part_concat(gl_part_proj_df['part_description'],gl_part_proj_df['proj_desc']).alias("part_output"))

My question is as I am using a pandas dataframe will it still execute in all the worker nodes or will it be brought to the driver?

2 REPLIES 2

Re: Pyspark UDF question

@Jayadeep Jayaraman

Yes, Pandas will execute on the nodes within your cluster, but you need to make sure that pandas (and any other libraries) are installed on the nodes.

To accomplish this, Hortonworks and Anaconda have partnered to create Cluster Management Packs: https://www.continuum.io/blog/developer-blog/self-service-open-data-science-custom-anaconda-manageme...

This is the preferred way to manage and ship python (and R) packages within your HDP cluster.

If you want to create a python virtual environment and ship it to the nodes of your cluster, then here's a good article: https://community.hortonworks.com/articles/58418/running-pyspark-with-conda-env.html

You need to zip up the relevant python environment (or packages) and ship it when starting your spark session, such as:

./bin/spark-submit --master yarn-cluster --archives your_python_env_or_packages.zip pyspark_project.py
Highlighted

Re: Pyspark UDF question

Contributor

Thanks @Dan Zaratsian I have already installed the Scipy toolkit on all nodes and was able to execute the code, but my confusion was born out of some material I read where it was mentioned that a pandas dataframe can only run on the driver mode when we use toPandas(). Can you clarify this for me? When will it run in driver vs when it gets executed on all the nodes?

Don't have an account?
Coming from Hortonworks? Activate your account here