Support Questions

Find answers, ask questions, and share your expertise

Turn on suggestions

Auto-suggest helps you quickly narrow down your search results by suggesting possible matches as you type.

Showing results for

- Cloudera Community
- :
- Support
- :
- Support Questions
- :
- Re: PySpark: How to add column to dataframe with c...

Announcements

Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

- Subscribe to RSS Feed
- Mark Question as New
- Mark Question as Read
- Float this Question for Current User
- Bookmark
- Subscribe
- Printer Friendly Page

- Subscribe to RSS Feed
- Mark Question as New
- Mark Question as Read
- Float this Question for Current User
- Bookmark
- Subscribe
- Printer Friendly Page

New Contributor

Created 10-04-2016 08:04 AM

- Mark as New
- Bookmark
- Subscribe
- Subscribe to RSS Feed
- Permalink
- Email to a Friend
- Report Inappropriate Content

Pardon, as I am still a novice with Spark. I am working with a Spark dataframe, with a column where each element contains a nested float array of variable lengths, typically 1024, 2048, or 4096. (These are vibration waveform signatures of different duration.)

An example element in the 'wfdataseries' colunmn would be [0.06692, 0.0805, 0.05738, 0.02046, -0.02518, ...]. A variety of metrics and statistics can be calculated from these blocks of vibration data.

The goal is to extract calculated features from each array, and place in a new column in the same dataframe. This is very easily accomplished with Pandas dataframes:

from pyspark.sql import HiveContext, Row #Import Spark Hive SQL

hiveCtx = HiveContext(sc) #Cosntruct SQL context

rows=hiveCtx.sql("SELECT collectiondate,serialno,system,accelerometerid,ispeakvue,wfdataseries,deltatimebetweenpoints,\ spectrumdataseries,maxfrequencyhz FROM test_vibration.vibrationblockdata") import pandas as pd df=rows.toPandas() df['wfdataseries']=df.wfdataseries.apply(lambda x: x.encode('UTF8').split(',')) def str2flt(data): #Define function for converting list of strings to list of floats return [float(value) for value in data] df['wfdataseries']=df.wfdataseries.apply(str2flt) df['WF_Peak']=df.wfdataseries.apply(lambda x: max(x)) #Calculate max value of nested array in each element of column 'wfdataseries' # Various vibration waveform statistics import numpy as np from scipy import stats df['WF_Var']=df.wfdataseries.apply(lambda x: np.var(np.array(x))) df['WF_Kurt']=df.wfdataseries.apply(lambda x: stats.kurtosis(np.array(x),bias=True)) df['WF_Skew']=df.wfdataseries.apply(lambda x: stats.skew(np.array(x),bias=True))

Translating this functionality to the Spark dataframe has been much more difficult. The first step was to split the string CSV element into an array of floats. Got that figured out:

from pyspark.sql import HiveContext #Import Spark Hive SQL hiveCtx = HiveContext(sc) #Cosntruct SQL context df=hiveCtx.sql("SELECT serialno,system,accelerometerid,ispeakvue,wfdataseries,deltatimebetweenpoints,\ spectrumdataseries,maxfrequencyhz FROM test_vibration.vibrationblockdata") from pyspark.sql.functions import col, split df = df.withColumn('wfdataseries',split(col('wfdataseries'),',').cast('array<float>'))

But now, how do I use withColumn() to calculate the maximum of the nested float array, or perform any other calculation on that array? I keep getting "'Column' object is not callable".

Would an explode() method be needed in this case? I'd prefer something as elegant as Pandas if possible.

Also, I would need Numpy and Scipy calculations to be passed in for array calculations, as was easily done in Pandas.

df = df.withColumn('WF_Peak',max('wfdataseries')) --------------------------------------------------------------------------- TypeError Traceback (most recent call last) <ipython-input-97-be5735ba4c8a> in <module>() ----> 1 df = df.withColumn('WF_Peak',max('wfdataseries')) TypeError: 'Column' object is not callable

df = df.withColumn('WF_Peak', df.wfdataseries.max()) --------------------------------------------------------------------------- TypeError Traceback (most recent call last) <ipython-input-99-a8b1116cac06> in <module>() ----> 1 df = df.withColumn('WF_Peak', df.wfdataseries.max()) TypeError: 'Column' object is not callable

Suppose I stick with Pandas and convert back to a Spark DF before saving to Hive table, would I be risking memory issues if the DF is too large?

1 ACCEPTED SOLUTION

Accepted Solutions

Expert Contributor

Created 10-11-2016 07:18 PM

- Mark as New
- Bookmark
- Subscribe
- Subscribe to RSS Feed
- Permalink
- Email to a Friend
- Report Inappropriate Content

Hi Brian,

You shouldn't need to use exlode, that will create a new row for each value in the array. The reason max isn't working for your dataframe is because it is trying to find the max for that column for every row in you dataframe and not just the max in the array. Instead you will need to define a udf and call the udf within withColumn

from pyspark.sql.functions import udf def maxList(list): max(list) maxUdf==udf(scoreToCategory, FloatType()) df = df.withColumn('WF_Peak', maxUdf('wfdataseries'))

As for using pandas and converting back to Spark DF, yes you will have a limitation on memory. toPandas calls collect on the dataframe and brings the entire dataset into memory on the driver, so you will be moving data across network and holding locally in memory, so this should only be called if the DF is small enough to store locally.

3 REPLIES 3

Expert Contributor

Created 10-11-2016 07:18 PM

- Mark as New
- Bookmark
- Subscribe
- Subscribe to RSS Feed
- Permalink
- Email to a Friend
- Report Inappropriate Content

Hi Brian,

You shouldn't need to use exlode, that will create a new row for each value in the array. The reason max isn't working for your dataframe is because it is trying to find the max for that column for every row in you dataframe and not just the max in the array. Instead you will need to define a udf and call the udf within withColumn

from pyspark.sql.functions import udf def maxList(list): max(list) maxUdf==udf(scoreToCategory, FloatType()) df = df.withColumn('WF_Peak', maxUdf('wfdataseries'))

As for using pandas and converting back to Spark DF, yes you will have a limitation on memory. toPandas calls collect on the dataframe and brings the entire dataset into memory on the driver, so you will be moving data across network and holding locally in memory, so this should only be called if the DF is small enough to store locally.

Re: PySpark: How to add column to dataframe with calculation from nested array of floats

New Contributor

Created on 10-12-2016 07:31 AM - edited 10-12-2016 07:35 AM

- Mark as New
- Bookmark
- Subscribe
- Subscribe to RSS Feed
- Permalink
- Email to a Friend
- Report Inappropriate Content

Thanks, the UDF approach does work in this case.

#Waveform peak amplitude udf_wf_peak = udf(lambda x: max(x), returnType=FloatType()) #Define UDF function df = df.withColumn('WF_Peak',udf_wf_peak('wfdataseries'))

However, using Numpy arrays and functions has proven tricky, as the Numpy float dtype evidently does not match the Spark FloatType(). So in this case, where evaluating the variance of a Numpy array, I've found a work-around by applying round(x, 10), which converts it back. I suspect there's a more elegant solution, but that seems to work for now.

#Waveform Variance udf_wf_var = udf(lambda x: round(np.var(np.array(x)),10), returnType=FloatType()) #Define UDF function df = df.withColumn('WF_Var',udf_wf_var('wfdataseries'))

Highlighted
##

Re: PySpark: How to add column to dataframe with calculation from nested array of floats

Expert Contributor

Created 10-12-2016 06:00 PM

- Mark as New
- Bookmark
- Subscribe
- Subscribe to RSS Feed
- Permalink
- Email to a Friend
- Report Inappropriate Content

Great, I'm glad the udf worked. As for the numpy issue, I'm not familiar enough with using numpy within spark to give any insights, but the workaround seems trivial enough. If you are looking for a more elegant solution, you may want to create a new thread and include the error. You may also want to take a look at sparks mllib statistics functions[1], though they operate across rows instead of within a single column.

1. http://spark.apache.org/docs/latest/mllib-statistics.html

Coming from Hortonworks? Activate your account here