Support Questions

Find answers, ask questions, and share your expertise
Announcements
Check out our newest addition to the community, the Cloudera Data Analytics (CDA) group hub.

spark join with udf fails

New Contributor

Hi,

I am trying to do a join in spark using udfs in the join condition, but getting the error shown below:

The joins work fine without udfs. Is is possibel to use udfs in the manner below. The udfs work fine in select etc.

result_df = t1_df.join(t2_df, _udf1(t1_df['col1']) == _udf2(t2_df['col1']), "inner")

File "/usr/hdp/2.3.4.14-9/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 584, in join File "/usr/hdp/2.3.4.14-9/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__ File "/usr/hdp/2.3.4.14-9/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 36, in deco File "/usr/hdp/2.3.4.14-9/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o79.join. : java.lang.ClassCastException: org.apache.spark.sql.catalyst.plans.logical.Project cannot be cast to org.apache.spark.sql.catalyst.plans.logical.Join at org.apache.spark.sql.DataFrame.join(DataFrame.scala:554) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Thread.java:745)

2 REPLIES 2

@xrcs blue

Looks like you are using Spark python API. The pyspark documentation says:

join :

  • on – a string for join column name, a list of column names, , a join expression (Column) or a list of Columns. If on is a string or a list of string indicating the name of the join column(s), the column(s) must exist on both sides, and this performs an equi-join.

Therefore, do the columns exist on both sides of join tables? Also, wondering if you can encode the "condition" separately, then pass it to the join() method, like this:

>>> cond = [df.name == df3.name, df.age == df3.age]
>>> df.join(df3, cond, 'outer')

New Contributor

Maybe this issue since I am using v 1.6

https://issues.apache.org/jira/browse/SPARK-12981

Take a Tour of the Community
Don't have an account?
Your experience may be limited. Sign in to explore more.