Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Rowwise manipulation of a DataFrame in PySpark.

avatar
Contributor

If have a DataFrame and want to do some manipulation of the Data in a Function depending on the values of the row.

my_udf(row):
	threshold = 10
	if row.val_x > threshold:
		row.val_x = another_function(row.val_x)
		row.val_y = another_function(row.val_y)
		return row
	else:
		return row

Does anyone know how to apply my udf to the DataFrame?

4 REPLIES 4

avatar

@Lukas Müller

This should work for you:

from pyspark.sql.types import *
from pyspark.sql.functions import udf

# Create your UDF object (which accepts your python function called "my_udf")
udf_object = udf(my_udf, ArrayType(StringType()))

# Apply the UDF to your Dataframe (called "df")
new_df = df.withColumn("new_column", udf_object(struct([df[x] for x in df.columns])))

That should work for you. If you want to make this better, replace "ArrayType(StringType())" with a schema such as:

schema = ArrayType(StructType([
    StructField("mychar", StringType(), False),
    StructField("myint", IntegerType(), False)
])) 

Hope this helps!

avatar
Contributor

When try this I get the following error:

NameError: global name 'struct' is not defined

Tried the same with StructType instead of struct

AssertionError: fields should be a list of StructField

avatar

Hi @Lukas Müller

You need to import the required libraries:

from pyspark.sql.functions import struct

You could also just import all classes:

from pyspark.sql.functions import *

avatar
New Contributor

I have a PySpark dataframe with 87 columns. I want to pass each row of the dataframe to a function and get a list for each row so that I can create a column separately.

`

PySpark code

UDF:

def make_range_vector(row,categories,ledger): print(type(row),type(categories),type(ledger))
category_vector=[] for category in categories: if(row[category]!=0): category_percentage=func.round(row[category]*100/row[ledger]) category_vector.append(category_percentage) else: category_vector.append(0) category_vector=sqlCtx.createDataFrame(category_vector,IntegerType())
return category_vector

Main function

pivot_card.withColumn('category_debit_vector',(make_range_vector(struct([pivot_card[x] for x in pivot_card.columns] ),pivot_card.columns[3:],'debit')))

I am beginner in PySpark, and I can't find answers to below questions.

  1. Print statement outputs <class 'pyspark.sql.column.Column'> <class 'list'> <class #'str'>. Shouldn't it be StructType?

  2. Can I pass a Row object and do something similar, like we do in Pandas ?