Created 02-03-2018 08:45 AM
Hi, Below is the input schema and output schema.
i/p: row_id,ODS_WII_VERB,stg_load_ts,other_columns
o/p: get the max timestamp group by row_id and ODS_WII_VERB
issue: As we use only row_id and ODS_WII_VERB in the group by clause we are unable to get the other columns. How to get other columns as well. We tried creating a spark sql subquery but it seems spark sub query is not working in spark structured streaming. How to resolve this issue.
code snippet
val csvDF = sparkSession .readStream .option("sep", ",") .schema(userSchema) .csv("C:\\Users\\M1037319\\Desktop\\data")
val updatedDf = csvDF.withColumn("ODS_WII_VERB", regexp_replace(col("ODS_WII_VERB"), "I", "U")) updatedDf.printSchema()
val grpbyDF = updatedDf.groupBy("ROW_ID","ODS_WII_VERB").max("STG_LOAD_TS")
Created 02-03-2018 02:47 PM
Apply Aggregate(agg) function after group by
val grpbyDF = updatedDf.groupBy("ROW_ID","ODS_WII_VERB").agg(max("STG_LOAD_TS"))
Now the grpbyDF dataframe is going to group by ROW_ID,ODS_WII_VERB and gets max value of STG_LOAD_TS column.
Created 02-04-2018 05:28 AM
Hi Shu,
@shu
I have few other columns apart from the ROW_ID,ODS_WII_VERB columns in the input. But they are not part of group by clause. How to retrieve those columns as well.
Created 02-04-2018 05:59 AM
To get non group by columns after grouped dataframe, we need to use one of the aggregate(agg) function(max, min, mean and sum..etc) for all the non group by columns.
Example:-
val grpbyDF = updatedDf.groupBy("ROW_ID","ODS_WII_VERB").agg(<strong>max</strong>("STG_LOAD_TS"),<strong>min</strong>("non groupby column"),<strong>mean</strong>("non groupby column"),<strong>sum</strong>("non groupby column"))
In the above grpbydf we are grouping by ROW_ID,ODS_WII_VERB and all non group by columns are in agg function with one of the function(max, min, mean and sum).
Please Refer to below link for more details about groupBy
http://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.groupBy
Created 02-04-2018 07:22 AM
Thanks a lot for the answer. In my case the non group by columns are string data types. Can I use non group by columns that are string data types in the aggregation function?
Can I create temp view on the data frame and then use subquery to retrieve the results? Is this possible in structured streaming?
Created 02-04-2018 03:13 PM
if you are having integer,float.. values represented as string datatypes,then you can use string datatypes in aggregation.
Example:-
10,10.5 value in age column represented as string data type, we can use aggregate functions on this age column directly.
gr_df2=gr_df.agg(sum(col('age')))
(or)
You can cast String data types to int,double..etc in aggregations also.
from pyspark.sql.types import * gr_df2=gr_df.agg(sum(col('age').cast("int")))
Casting age column as integer and apply aggregate functions on age column.
Create a temp table on the dataframe, use your sql queries on the temp table
gr_df2.registerTempTable("people")
hc.sql("select * from people").show(10,False)