Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

spark join with udf fails

spark join with udf fails

New Contributor

Hi,

I am trying to do a join in spark using udfs in the join condition, but getting the error shown below:

The joins work fine without udfs. Is is possibel to use udfs in the manner below. The udfs work fine in select etc.

result_df = t1_df.join(t2_df, _udf1(t1_df['col1']) == _udf2(t2_df['col1']), "inner")

File "/usr/hdp/2.3.4.14-9/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 584, in join File "/usr/hdp/2.3.4.14-9/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__ File "/usr/hdp/2.3.4.14-9/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 36, in deco File "/usr/hdp/2.3.4.14-9/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o79.join. : java.lang.ClassCastException: org.apache.spark.sql.catalyst.plans.logical.Project cannot be cast to org.apache.spark.sql.catalyst.plans.logical.Join at org.apache.spark.sql.DataFrame.join(DataFrame.scala:554) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Thread.java:745)

2 REPLIES 2

Re: spark join with udf fails

@xrcs blue

Looks like you are using Spark python API. The pyspark documentation says:

join :

  • on – a string for join column name, a list of column names, , a join expression (Column) or a list of Columns. If on is a string or a list of string indicating the name of the join column(s), the column(s) must exist on both sides, and this performs an equi-join.

Therefore, do the columns exist on both sides of join tables? Also, wondering if you can encode the "condition" separately, then pass it to the join() method, like this:

>>> cond = [df.name == df3.name, df.age == df3.age]
>>> df.join(df3, cond, 'outer')

Re: spark join with udf fails

New Contributor

Maybe this issue since I am using v 1.6

https://issues.apache.org/jira/browse/SPARK-12981

Don't have an account?
Coming from Hortonworks? Activate your account here