Created 08-22-2017 01:26 PM
If have a DataFrame and want to do some manipulation of the Data in a Function depending on the values of the row.
my_udf(row): threshold = 10 if row.val_x > threshold: row.val_x = another_function(row.val_x) row.val_y = another_function(row.val_y) return row else: return row
Does anyone know how to apply my udf to the DataFrame?
Created 08-22-2017 01:47 PM
This should work for you:
from pyspark.sql.types import *
from pyspark.sql.functions import udf
# Create your UDF object (which accepts your python function called "my_udf")
udf_object = udf(my_udf, ArrayType(StringType()))
# Apply the UDF to your Dataframe (called "df")
new_df = df.withColumn("new_column", udf_object(struct([df[x] for x in df.columns])))That should work for you. If you want to make this better, replace "ArrayType(StringType())" with a schema such as:
schema = ArrayType(StructType([
    StructField("mychar", StringType(), False),
    StructField("myint", IntegerType(), False)
])) Hope this helps!
Created 08-22-2017 04:02 PM
When try this I get the following error:
NameError: global name 'struct' is not defined
Tried the same with StructType instead of struct
AssertionError: fields should be a list of StructField
Created 08-23-2017 01:25 PM
You need to import the required libraries:
from pyspark.sql.functions import struct
You could also just import all classes:
from pyspark.sql.functions import *
Created 08-12-2019 11:33 AM
I have a PySpark dataframe with 87 columns. I want to pass each row of the dataframe to a function and get a list for each row so that I can create a column separately.
`
def make_range_vector(row,categories,ledger):    print(type(row),type(categories),type(ledger))
    category_vector=[]    for category in categories:      if(row[category]!=0):         category_percentage=func.round(row[category]*100/row[ledger])         category_vector.append(category_percentage)      else:          category_vector.append(0)    category_vector=sqlCtx.createDataFrame(category_vector,IntegerType())
    return category_vector 
 pivot_card.withColumn('category_debit_vector',(make_range_vector(struct([pivot_card[x]  for x in pivot_card.columns] ),pivot_card.columns[3:],'debit'))) 
I am beginner in PySpark, and I can't find answers to below questions.
Print statement outputs <class 'pyspark.sql.column.Column'> <class 'list'> <class #'str'>. Shouldn't it be StructType?
Can I pass a Row object and do something similar, like we do in Pandas ?
 
					
				
				
			
		
