Support Questions
Find answers, ask questions, and share your expertise
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

How to get the row top 1 in Spark Structured Streaming?


How to get the row top 1 in Spark Structured Streaming?

New Contributor

I have an issue with Spark Streaming (Spark 2.2.1). I am developing a real time pipeline where first I get data from Kafka, second join the with another table, then send the dataframe to a ALS model (Spark ML) and it return a streaming dataframe with one additional column predit. The problem is when I tried to get the row with the highest score, I couldn't find a way to resolve it.

I tried: 1. Apply SQL functions like Limit, Take, sort 2. dense_rank() function 3. search in Stackoverflow

I read Unsupported Operations but doesn't seem to be much there.

Additional with the highest score I would send to a Kafka queue

My code is as follows:


val result = lines.selectExpr("CAST(value AS STRING)")
.select(from_json($"value", mySchema).as("data"))
.selectExpr("cast(data.largo as int) as largo","cast(data.stock as int) as stock","data.verificavalormax","data.codbc","data.ide","data.timestamp_cli","data.tef_cli","data.nombre","data.descripcion","data.porcentaje","data.fechainicio","data.fechafin","data.descripcioncompleta","data.direccion","data.coordenadax","data.coordenaday","data.razon_social","data.segmento_app","data.categoria","data.subcategoria")

val model = ALSModel.load("ALSParaTiDos")

val fullPredictions = model.transform(result)

//fullPredictions is a streaming dataframe with a extra column "prediction", here i need the code to get the first row

val query = fullPredictions.writeStream.format("console").outputMode(OutputMode.Append()).option("truncate", "false").start()