Support Questions
Find answers, ask questions, and share your expertise

Spark generate multiple rows based on column value

Spark generate multiple rows based on column value

I had dataframe data looks like


  So my requirement is if datediff is 32 I need to get perday usage For the first id 32 is the datediff so per day it will be 127/32. When I collect the result I should get

I had tried I Was struck at initial line since above line transforms me one single but I can't understand how to get multiple rows based single row using datediff

Val df2 ="Id","startdate",enddate","datediff","did","usage").withColumn("Day",dayofmonth($"startdate")).withColumn("Month",month($"startdate")).withColumn("perdayusaga',getperdayusageudf($"usage",$"datediff))

How could I get these results again as a dataframe


Re: Spark generate multiple rows based on column value

I tried this with udf and want to take the values to stringbuilder and then on next step I want to explode the values but can able to register the udf but unable get results

 val myUdf = udf { (col1: Timestamp, col2: Timestamp, col3: Int, sqlContext: SQLContext) =>
    import sqlContext.implicits._
    val sb = StringBuilder.newBuilder   
    if (col3 == 0) {
      val dd = dayofmonth(date_add($"col1", 1))
      val mm = month(date_add($"col1", 1))
      val yy = year(date_add($"col1", 1))
      val normalizedDate = concat(dd, mm, yy)
    } else {
      for (i <- 2 until col3) {
        val dd = dayofmonth(date_add($"col1", i))
        val mm = month(date_add($"col1", i))
        val yy = year(date_add($"col1", i))
        val normalizedDate = concat(dd, mm, yy)

java.lang.ClassCastException: $anonfun$1 cannot be cast to scala.Function3
  at org.apache.spark.sql.catalyst.expressions.ScalaUDF.<init>(ScalaUDF.scala:106)
  at org.apache.spark.sql.expressions.UserDefinedFunction.apply(UserDefinedFunction.scala:56)
  ... 52 elided

Re: Spark generate multiple rows based on column value

Expert Contributor

I think this is a duplicate of the question for which I already posted an answer.

This is the working code again, you can convert to dataframe or do operations in Rdd

import org.apache.spark.{ SparkConf, SparkContext }
import org.apache.spark.sql.functions.broadcast
import org.apache.spark.sql.types._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._

import scala.collection.mutable.ListBuffer
import java.util.{GregorianCalendar, Date}
import java.util.Calendar

def generateDates(startdate: Date, enddate: Date): ListBuffer[String] =    {
  var dateList = new ListBuffer[String]()
  var calendar = new GregorianCalendar()
    while (calendar.getTime().before(enddate)) {
        dateList += calendar.getTime().toString().substring(0, 10) + "," + (calendar.get(Calendar.DAY_OF_MONTH)) + "," + calendar.get(Calendar.MONTH)
        calendar.add(Calendar.DATE, 1)
  dateList += calendar.getTime().toString()
  println("\n" + dateList + "\n")

def getRddList(a :String) : ListBuffer[(String,String,String,String,String)] = {
  var allDates = new ListBuffer[(String,String,String,String,String)]()
  for (x <- generateDates(format.parse(a.split(",")(1)),format.parse(a.split(",")(2)))) {
              allDates += ((  a.split(",")(0).toString(), x , a.split(",")(3).toString(),
                               a.split(",")(4).toString(),  a.split(",")(5).toString()  ))

var  fileRdd = sc.textFile("/data_1/date1");
var myRdd ={x=>getRddList(x)}.flatMap(y=>y)

Re: Spark generate multiple rows based on column value

Try "explode":

import org.apache.spark.sql.functions.{udf, array, explode, col}

case class Result ( date: String, usage: Double )

def splitUsage = udf { (datediff:Integer, startdate: String, usage:Integer) =>
    if (datediff == 32) {
        val date = new DateTime(format.parse(startdate))
        (for (i <- 0 to datediff) yield Result(format.format(date.plusDays(2).toDate()), 
                                               usage.toDouble / datediff.toDouble)).toArray
    } else {
        Array(Result(startdate, usage.toDouble))
val df2 = df.withColumn("dayusage", splitUsage($"datediff", $"startdate", $"usage"))
val df3 =$"*", explode($"dayusage"))
val result =$"Id", $"startdate", $"enddate", $"datediff", $"did", 
                        col("col")("date").alias("date"), col("col")("usage").alias("usage"))