I have defined the following function as an UDF in pyspark
def dist(x,y): a=words(x,'event') b=words(y,'proj_desc') a['key']=1 b['key']=1 ab=pd.merge(a,b) del ab['key'] ab['lv']=ab.apply(lambda x: lvd(x['event'],x['proj_desc']),axis=1) score=float(np.mean(ab.groupby('event')['lv'].max())) match=float(len(ab[ab['lv']==1])) sim=float(match)/len(a) return (x,score,match,sim);
The above function is being executed as below
part_concat = udf(dist, part_output)
part_output_df = gl_part_proj_df.select(part_concat(gl_part_proj_df['part_description'],gl_part_proj_df['proj_desc']).alias("part_output"))
My question is as I am using a pandas dataframe will it still execute in all the worker nodes or will it be brought to the driver?
Yes, Pandas will execute on the nodes within your cluster, but you need to make sure that pandas (and any other libraries) are installed on the nodes.
To accomplish this, Hortonworks and Anaconda have partnered to create Cluster Management Packs: https://www.continuum.io/blog/developer-blog/self-service-open-data-science-custom-anaconda-manageme...
This is the preferred way to manage and ship python (and R) packages within your HDP cluster.
If you want to create a python virtual environment and ship it to the nodes of your cluster, then here's a good article: https://community.hortonworks.com/articles/58418/running-pyspark-with-conda-env.html
You need to zip up the relevant python environment (or packages) and ship it when starting your spark session, such as:
./bin/spark-submit --master yarn-cluster --archives your_python_env_or_packages.zip pyspark_project.py
Thanks @Dan Zaratsian I have already installed the Scipy toolkit on all nodes and was able to execute the code, but my confusion was born out of some material I read where it was mentioned that a pandas dataframe can only run on the driver mode when we use toPandas(). Can you clarify this for me? When will it run in driver vs when it gets executed on all the nodes?