spark join with udf fails

I am trying to do a join in spark using udfs in the join condition, but getting the error shown below:

The joins work fine without udfs. Is is possibel to use udfs in the manner below. The udfs work fine in select etc.

result_df = t1_df.join(t2_df, _udf1(t1_df['col1']) == _udf2(t2_df['col1']), "inner")

File "/usr/hdp/", line 584, in join File "/usr/hdp/", line 538, in __call__ File "/usr/hdp/", line 36, in deco File "/usr/hdp/", line 300, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o79.join. : java.lang.ClassCastException: org.apache.spark.sql.catalyst.plans.logical.Project cannot be cast to org.apache.spark.sql.catalyst.plans.logical.Join at org.apache.spark.sql.DataFrame.join(DataFrame.scala:554) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke( at sun.reflect.DelegatingMethodAccessorImpl.invoke( at java.lang.reflect.Method.invoke( at py4j.reflection.MethodInvoker.invoke( at py4j.reflection.ReflectionEngine.invoke( at py4j.Gateway.invoke( at py4j.commands.AbstractCommand.invokeMethod( at py4j.commands.CallCommand.execute( at at



Looks like you are using Spark python API. The pyspark documentation says:

join :

  • on – a string for join column name, a list of column names, , a join expression (Column) or a list of Columns. If on is a string or a list of string indicating the name of the join column(s), the column(s) must exist on both sides, and this performs an equi-join.

Therefore, do the columns exist on both sides of join tables? Also, wondering if you can encode the "condition" separately, then pass it to the join() method, like this:

>>> cond = [ ==, df.age == df3.age]
>>> df.join(df3, cond, 'outer')

Maybe this issue since I am using v 1.6