Reply
Highlighted
New Contributor
Posts: 1
Registered: ‎10-31-2017

How to get the row top 1 in Spark Structured Streaming?

I have an issue with Spark Streaming (Spark 2.2.1). I am developing a real time pipeline where first I get data from Kafka, second join the with another table, then send the dataframe to a ALS model (Spark ML) and it return a streaming dataframe with one additional column predit. The problem is when I tried to get the row with the highest score, I couldn't find a way to resolve it.

I tried: 1. Apply SQL functions like Limit, Take, sort 2. dense_rank() function 3. search in Stackoverflow

I read Unsupported Operations but doesn't seem to be much there.

Additional with the highest score I would send to a Kafka queue

My code is as follows:

 

val result = lines.selectExpr("CAST(value AS STRING)")
.select(from_json($"value", mySchema).as("data"))
//.select("data.*")
.selectExpr("cast(data.largo as int) as largo","cast(data.stock as int) as stock","data.verificavalormax","data.codbc","data.ide","data.timestamp_cli","data.tef_cli","data.nombre","data.descripcion","data.porcentaje","data.fechainicio","data.fechafin","data.descripcioncompleta","data.direccion","data.coordenadax","data.coordenaday","data.razon_social","data.segmento_app","data.categoria","data.subcategoria")
result.printSchema()

val model = ALSModel.load("ALSParaTiDos")

val fullPredictions = model.transform(result)

//fullPredictions is a streaming dataframe with a extra column "prediction", here i need the code to get the first row

val query = fullPredictions.writeStream.format("console").outputMode(OutputMode.Append()).option("truncate", "false").start()
   query.awaitTermination()
Announcements