Member since
10-04-2016
3
Posts
3
Kudos Received
0
Solutions
11-15-2016
02:10 PM
1 Kudo
If you are only concerned with static DataFrames (and not steaming), this is pretty straight forward programmatically. To create the DataFrame from a Hive table with example query: df = spark.sql(SELECT * FROM table_name1) To save a DataFrame back to a Hive table: df.write.saveAsTable('table_name2',format='parquet',mode='overwrite') Now, you may want to try listing databases instead of tables. Listing tables will only list the tables associated with your current database. The default database is likely empty if you're just starting out. My struggle is in Spark Streaming with version 2.0.0.cloudera.beta1, where the saveAsTable method is not available for a streaming DataFrame. That makes it all a bit trickier compared to the static dataframe read/write.
... View more
10-12-2016
07:31 AM
Thanks, the UDF approach does work in this case. #Waveform peak amplitude
udf_wf_peak = udf(lambda x: max(x), returnType=FloatType()) #Define UDF function
df = df.withColumn('WF_Peak',udf_wf_peak('wfdataseries')) However, using Numpy arrays and functions has proven tricky, as the Numpy float dtype evidently does not match the Spark FloatType(). So in this case, where evaluating the variance of a Numpy array, I've found a work-around by applying round(x, 10), which converts it back. I suspect there's a more elegant solution, but that seems to work for now. #Waveform Variance
udf_wf_var = udf(lambda x: round(np.var(np.array(x)),10), returnType=FloatType()) #Define UDF function
df = df.withColumn('WF_Var',udf_wf_var('wfdataseries'))
... View more
10-04-2016
08:04 AM
Pardon, as I am still a novice with Spark. I am working with a Spark dataframe, with a column where each element contains a nested float array of variable lengths, typically 1024, 2048, or 4096. (These are vibration waveform signatures of different duration.) An example element in the 'wfdataseries' colunmn would be [0.06692, 0.0805, 0.05738, 0.02046, -0.02518, ...]. A variety of metrics and statistics can be calculated from these blocks of vibration data. The goal is to extract calculated features from each array, and place in a new column in the same dataframe. This is very easily accomplished with Pandas dataframes: from pyspark.sql import HiveContext, Row #Import Spark Hive SQL hiveCtx = HiveContext(sc) #Cosntruct SQL context rows=hiveCtx.sql("SELECT collectiondate,serialno,system,accelerometerid,ispeakvue,wfdataseries,deltatimebetweenpoints,\
spectrumdataseries,maxfrequencyhz FROM test_vibration.vibrationblockdata")
import pandas as pd
df=rows.toPandas()
df['wfdataseries']=df.wfdataseries.apply(lambda x: x.encode('UTF8').split(','))
def str2flt(data): #Define function for converting list of strings to list of floats
return [float(value) for value in data]
df['wfdataseries']=df.wfdataseries.apply(str2flt)
df['WF_Peak']=df.wfdataseries.apply(lambda x: max(x)) #Calculate max value of nested array in each element of column 'wfdataseries'
# Various vibration waveform statistics
import numpy as np
from scipy import stats
df['WF_Var']=df.wfdataseries.apply(lambda x: np.var(np.array(x)))
df['WF_Kurt']=df.wfdataseries.apply(lambda x: stats.kurtosis(np.array(x),bias=True))
df['WF_Skew']=df.wfdataseries.apply(lambda x: stats.skew(np.array(x),bias=True)) Translating this functionality to the Spark dataframe has been much more difficult. The first step was to split the string CSV element into an array of floats. Got that figured out: from pyspark.sql import HiveContext #Import Spark Hive SQL
hiveCtx = HiveContext(sc) #Cosntruct SQL context
df=hiveCtx.sql("SELECT serialno,system,accelerometerid,ispeakvue,wfdataseries,deltatimebetweenpoints,\
spectrumdataseries,maxfrequencyhz FROM test_vibration.vibrationblockdata")
from pyspark.sql.functions import col, split
df = df.withColumn('wfdataseries',split(col('wfdataseries'),',').cast('array<float>')) But now, how do I use withColumn() to calculate the maximum of the nested float array, or perform any other calculation on that array? I keep getting "'Column' object is not callable". Would an explode() method be needed in this case? I'd prefer something as elegant as Pandas if possible. Also, I would need Numpy and Scipy calculations to be passed in for array calculations, as was easily done in Pandas. df = df.withColumn('WF_Peak',max('wfdataseries'))
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
<ipython-input-97-be5735ba4c8a> in <module>()
----> 1 df = df.withColumn('WF_Peak',max('wfdataseries'))
TypeError: 'Column' object is not callable df = df.withColumn('WF_Peak', df.wfdataseries.max())
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
<ipython-input-99-a8b1116cac06> in <module>()
----> 1 df = df.withColumn('WF_Peak', df.wfdataseries.max())
TypeError: 'Column' object is not callable Suppose I stick with Pandas and convert back to a Spark DF before saving to Hive table, would I be risking memory issues if the DF is too large?
... View more
Labels:
- Labels:
-
Apache Hive
-
Apache Spark