Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

How to get the non group by columns in spark structured streaming

avatar
Explorer

Hi, Below is the input schema and output schema.

i/p: row_id,ODS_WII_VERB,stg_load_ts,other_columns

o/p: get the max timestamp group by row_id and ODS_WII_VERB

issue: As we use only row_id and ODS_WII_VERB in the group by clause we are unable to get the other columns. How to get other columns as well. We tried creating a spark sql subquery but it seems spark sub query is not working in spark structured streaming. How to resolve this issue.

code snippet

val csvDF = sparkSession .readStream .option("sep", ",") .schema(userSchema) .csv("C:\\Users\\M1037319\\Desktop\\data")

val updatedDf = csvDF.withColumn("ODS_WII_VERB", regexp_replace(col("ODS_WII_VERB"), "I", "U")) updatedDf.printSchema()

val grpbyDF = updatedDf.groupBy("ROW_ID","ODS_WII_VERB").max("STG_LOAD_TS")

5 REPLIES 5

avatar
Master Guru

@elango vaithiyanathan

Apply Aggregate(agg) function after group by

val grpbyDF = updatedDf.groupBy("ROW_ID","ODS_WII_VERB").agg(max("STG_LOAD_TS"))

Now the grpbyDF dataframe is going to group by ROW_ID,ODS_WII_VERB and gets max value of STG_LOAD_TS column.

avatar
Explorer

Hi Shu,

@shu

I have few other columns apart from the ROW_ID,ODS_WII_VERB columns in the input. But they are not part of group by clause. How to retrieve those columns as well.

avatar
Master Guru

@elango vaithiyanathan

To get non group by columns after grouped dataframe, we need to use one of the aggregate(agg) function(max, min, mean and sum..etc) for all the non group by columns.

Example:-

val grpbyDF = updatedDf.groupBy("ROW_ID","ODS_WII_VERB").agg(<strong>max</strong>("STG_LOAD_TS"),<strong>min</strong>("non groupby column"),<strong>mean</strong>("non groupby column"),<strong>sum</strong>("non groupby column"))

In the above grpbydf we are grouping by ROW_ID,ODS_WII_VERB and all non group by columns are in agg function with one of the function(max, min, mean and sum).

Please Refer to below link for more details about groupBy
http://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.groupBy

avatar
Explorer

@Shu

Thanks a lot for the answer. In my case the non group by columns are string data types. Can I use non group by columns that are string data types in the aggregation function?

Can I create temp view on the data frame and then use subquery to retrieve the results? Is this possible in structured streaming?

avatar
Master Guru
@elango vaithiyanathan

if you are having integer,float.. values represented as string datatypes,then you can use string datatypes in aggregation.

Example:-

10,10.5 value in age column represented as string data type, we can use aggregate functions on this age column directly.

 gr_df2=gr_df.agg(sum(col('age')))

(or)

You can cast String data types to int,double..etc in aggregations also.

from pyspark.sql.types import * 
gr_df2=gr_df.agg(sum(col('age').cast("int")))

Casting age column as integer and apply aggregate functions on age column.

Create a temp table on the dataframe, use your sql queries on the temp table

gr_df2.registerTempTable("people")
hc.sql("select * from people").show(10,False)