Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Spark generate multiple rows based on column value

avatar

I had dataframe data looks like

Id,startdate,enddate,datediff,did,usage
 1,2015-08-26,2015-09-27,32,326-10,127
 2,2015-09-27,2015-10-20,21,327-99,534
  ..
  ..

  So my requirement is if datediff is 32 I need to get perday usage For the first id 32 is the datediff so per day it will be 127/32. When I collect the result I should get

Id,startdate,enddate,datediff,day,month,did,usage,perdayusage
1,2015-08-26,2015-09-27,32,26,08,326-10,127,3.96
1,2015-08-26,2015-09-27,32,27,08,326-10,127,3.96
1,2015-08-26,2015-09-27,32,28,08,326-10,127,3.96
.
.
.
1,2015-08-26,2015-09-27,32,27,09,326-10,127,3.96
I had tried I Was struck at initial line since above line transforms me one single but I can't understand how to get multiple rows based single row using datediff


Val df2 = df1.select("Id","startdate",enddate","datediff","did","usage").withColumn("Day",dayofmonth($"startdate")).withColumn("Month",month($"startdate")).withColumn("perdayusaga',getperdayusageudf($"usage",$"datediff))


How could I get these results again as a dataframe


3 REPLIES 3

avatar

I tried this with udf and want to take the values to stringbuilder and then on next step I want to explode the values but can able to register the udf but unable get results

 val myUdf = udf { (col1: Timestamp, col2: Timestamp, col3: Int, sqlContext: SQLContext) =>
    import sqlContext.implicits._
    val sb = StringBuilder.newBuilder   
    if (col3 == 0) {
      val dd = dayofmonth(date_add($"col1", 1))
      val mm = month(date_add($"col1", 1))
      val yy = year(date_add($"col1", 1))
      val normalizedDate = concat(dd, mm, yy)
      sb.append(dd).append(",").append(mm).append(",").append(yy).append(",").append(normalizedDate)
    } else {
      for (i <- 2 until col3) {
        val dd = dayofmonth(date_add($"col1", i))
        val mm = month(date_add($"col1", i))
        val yy = year(date_add($"col1", i))
        val normalizedDate = concat(dd, mm, yy)
        sb.append(dd).append(",").append(mm).append(",").append(yy).append(",").append(normalizedDate)
      }
    }
    sb.toString
  }



java.lang.ClassCastException: $anonfun$1 cannot be cast to scala.Function3
  at org.apache.spark.sql.catalyst.expressions.ScalaUDF.<init>(ScalaUDF.scala:106)
  at org.apache.spark.sql.expressions.UserDefinedFunction.apply(UserDefinedFunction.scala:56)
  ... 52 elided




avatar
Expert Contributor

I think this is a duplicate of the question for which I already posted an answer.

https://community.hortonworks.com/questions/84507/sql-query-to-sparkdataframe-to-get-date-add-interv...

This is the working code again, you can convert to dataframe or do operations in Rdd

import org.apache.spark.{ SparkConf, SparkContext }
import org.apache.spark.sql.functions.broadcast
import org.apache.spark.sql.types._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._

import scala.collection.mutable.ListBuffer
import java.util.{GregorianCalendar, Date}
import java.util.Calendar


def generateDates(startdate: Date, enddate: Date): ListBuffer[String] =    {
  var dateList = new ListBuffer[String]()
  var calendar = new GregorianCalendar()
  calendar.setTime(startdate)
    while (calendar.getTime().before(enddate)) {
        dateList += calendar.getTime().toString().substring(0, 10) + "," + (calendar.get(Calendar.DAY_OF_MONTH)) + "," + calendar.get(Calendar.MONTH)
        calendar.add(Calendar.DATE, 1)
    }
  dateList += calendar.getTime().toString()
  println("\n" + dateList + "\n")
  dateList
}

def getRddList(a :String) : ListBuffer[(String,String,String,String,String)] = {
  var allDates = new ListBuffer[(String,String,String,String,String)]()
  for (x <- generateDates(format.parse(a.split(",")(1)),format.parse(a.split(",")(2)))) {
              allDates += ((  a.split(",")(0).toString(), x , a.split(",")(3).toString(),
                               a.split(",")(4).toString(),  a.split(",")(5).toString()  ))
        }
          allDates
}


var  fileRdd = sc.textFile("/data_1/date1");
var myRdd = fileRdd.map{x=>getRddList(x)}.flatMap(y=>y)
myRdd.collect()

avatar

Try "explode":

import org.apache.spark.sql.functions.{udf, array, explode, col}

case class Result ( date: String, usage: Double )

def splitUsage = udf { (datediff:Integer, startdate: String, usage:Integer) =>
    if (datediff == 32) {
        val date = new DateTime(format.parse(startdate))
        (for (i <- 0 to datediff) yield Result(format.format(date.plusDays(2).toDate()), 
                                               usage.toDouble / datediff.toDouble)).toArray
    } else {
        Array(Result(startdate, usage.toDouble))
    }
}
val df2 = df.withColumn("dayusage", splitUsage($"datediff", $"startdate", $"usage"))
val df3 = df2.select($"*", explode($"dayusage"))
val result = df3.select($"Id", $"startdate", $"enddate", $"datediff", $"did", 
                        col("col")("date").alias("date"), col("col")("usage").alias("usage"))