Created 08-22-2017 01:26 PM
If have a DataFrame and want to do some manipulation of the Data in a Function depending on the values of the row.
my_udf(row): threshold = 10 if row.val_x > threshold: row.val_x = another_function(row.val_x) row.val_y = another_function(row.val_y) return row else: return row
Does anyone know how to apply my udf to the DataFrame?
Created 08-22-2017 01:47 PM
This should work for you:
from pyspark.sql.types import * from pyspark.sql.functions import udf # Create your UDF object (which accepts your python function called "my_udf") udf_object = udf(my_udf, ArrayType(StringType())) # Apply the UDF to your Dataframe (called "df") new_df = df.withColumn("new_column", udf_object(struct([df[x] for x in df.columns])))
That should work for you. If you want to make this better, replace "ArrayType(StringType())" with a schema such as:
schema = ArrayType(StructType([ StructField("mychar", StringType(), False), StructField("myint", IntegerType(), False) ]))
Hope this helps!
Created 08-22-2017 04:02 PM
When try this I get the following error:
NameError: global name 'struct' is not defined
Tried the same with StructType instead of struct
AssertionError: fields should be a list of StructField
Created 08-23-2017 01:25 PM
You need to import the required libraries:
from pyspark.sql.functions import struct
You could also just import all classes:
from pyspark.sql.functions import *
Created 08-12-2019 11:33 AM
I have a PySpark dataframe with 87 columns. I want to pass each row of the dataframe to a function and get a list for each row so that I can create a column separately.
`
def make_range_vector(row,categories,ledger): print(type(row),type(categories),type(ledger))
category_vector=[] for category in categories: if(row[category]!=0): category_percentage=func.round(row[category]*100/row[ledger]) category_vector.append(category_percentage) else: category_vector.append(0) category_vector=sqlCtx.createDataFrame(category_vector,IntegerType())
return category_vector
pivot_card.withColumn('category_debit_vector',(make_range_vector(struct([pivot_card[x] for x in pivot_card.columns] ),pivot_card.columns[3:],'debit')))
I am beginner in PySpark, and I can't find answers to below questions.
Print statement outputs <class 'pyspark.sql.column.Column'> <class 'list'> <class #'str'>
. Shouldn't it be StructType?
Can I pass a Row object and do something similar, like we do in Pandas ?