I have a UDF that returns a tuple object:
stringSchema = StructType([ StructField("fixedRoute", StringType(), False), StructField("accuracy", IntegerType(), False)]) def stringClassifier(x,y,z): ... do some code return (value1,value2) stringClassifier_udf = udf(stringClassifier, stringSchema)
I use it in a dataframe like this:
df = df.select(['route', 'routestring', stringClassifier_udf(x,y,z).alias('newcol')])
This works fine. I later split that tuple into two distinct columns. The UDF however does some string matching and is somewhat slow as it collects to the driver and then filters through a 10k item list to match a string. (it does this for every row). I've been reading about pandas_udf and Apache Arrow and was curious if running this same function would be possible with pandas_udf... or if this would be help improve the performance..? I think my hangup is that the return value of the UDF is a tuple item... here is my attempt:
from pyspark.sql.functions import pandas_udf, PandasUDFType stringSchema = StructType([ StructField("fixedRoute", StringType(), False), StructField("accuracy", IntegerType(), False)]) @pandas_udf(stringSchema) def stringClassifier(x,y,z): ... do some code return (value1,value2)
Of course this is gives me errors and I've tried decorating the function with: @pandas_udf('list', PandasUDFType.SCALAR)
My errors looks like this:
NotImplementedError: Invalid returnType with scalar Pandas UDFs: StructType(List(StructField(fixedRoute,StringType,false),StructField(accuracy,IntegerType,false))) is not supported
Any idea if there is a way to make this work?
It looks like you are using a scalar pandas_udf type, which doesn't support returning structs currently. I believe the return type you want is an array of strings, which is supported, so this should work. Try this:
@pandas_udf("array<string>") def stringClassifier(x,y,z): # return a pandas series of a list of strings, that is same length as input - for example s = pd.Series([[u"a", u"b"]] * len(x)) return s
If you are using Python 2, make sure your strings are in unicode otherwise they might get interpreted as bytes. Hope that helps!
Hey Bryan thanks so much for taking the time! I think I'm almost there! The hint about the unicode issue helping me get past the first slew of errors. I seem to be running into a length one now however:
@pandas_udf("array<string>") def stringClassifier(lookupstring, first, last): lookupstring = lookupstring.to_string().encode("utf-8") first = first.to_string().encode("utf-8") last = last.to_string().encode("utf-8") #this part takes the 3 strings above and reaches out to another library to do a string match result = process.extract(lookupstring, lookup_list, limit=4000) match_list = [item for item in result if item.startswith(first) and item.endswith(last)] result2 = process.extractOne(lookupstring, match_list) if result2 is not None and result2 > 75: answer = pd.Series(list(result2)) return answer else: fail = ["N/A","0"] return pd.Series(fail)
RuntimeError: Result vector from pandas_udf was not the required length: expected 1, got 2
I'm initially passing three strings as variables to the function which then get passed to another library. The result is a tuple which I covert to a list then to a pandas Series object. I'm curious how I can make a 2 item array object a length of 1 ..? I'm obviously missing some basics here.