Created 02-22-2017 06:22 AM
I had dataframe data looks like
Id,startdate,enddate,datediff,did,usage
1,2015-08-26,2015-09-27,32,326-10,127
2,2015-09-27,2015-10-20,21,327-99,534
..
..
So my requirement is if datediff is 32 I need to get perday usage For the first id 32 is the datediff so per day it will be 127/32. When I collect the result I should get
Id,startdate,enddate,datediff,day,month,did,usage,perdayusage
1,2015-08-26,2015-09-27,32,26,08,326-10,127,3.96
1,2015-08-26,2015-09-27,32,27,08,326-10,127,3.96
1,2015-08-26,2015-09-27,32,28,08,326-10,127,3.96
.
.
.
1,2015-08-26,2015-09-27,32,27,09,326-10,127,3.96
I had tried I Was struck at initial line since above line transforms me one single but I can't understand how to get multiple rows based single row using datediff
Val df2 = df1.select("Id","startdate",enddate","datediff","did","usage").withColumn("Day",dayofmonth($"startdate")).withColumn("Month",month($"startdate")).withColumn("perdayusaga',getperdayusageudf($"usage",$"datediff))
How could I get these results again as a dataframe
Created 02-22-2017 11:28 AM
I tried this with udf and want to take the values to stringbuilder and then on next step I want to explode the values but can able to register the udf but unable get results
val myUdf = udf { (col1: Timestamp, col2: Timestamp, col3: Int, sqlContext: SQLContext) =>
import sqlContext.implicits._
val sb = StringBuilder.newBuilder
if (col3 == 0) {
val dd = dayofmonth(date_add($"col1", 1))
val mm = month(date_add($"col1", 1))
val yy = year(date_add($"col1", 1))
val normalizedDate = concat(dd, mm, yy)
sb.append(dd).append(",").append(mm).append(",").append(yy).append(",").append(normalizedDate)
} else {
for (i <- 2 until col3) {
val dd = dayofmonth(date_add($"col1", i))
val mm = month(date_add($"col1", i))
val yy = year(date_add($"col1", i))
val normalizedDate = concat(dd, mm, yy)
sb.append(dd).append(",").append(mm).append(",").append(yy).append(",").append(normalizedDate)
}
}
sb.toString
}
java.lang.ClassCastException: $anonfun$1 cannot be cast to scala.Function3
at org.apache.spark.sql.catalyst.expressions.ScalaUDF.<init>(ScalaUDF.scala:106)
at org.apache.spark.sql.expressions.UserDefinedFunction.apply(UserDefinedFunction.scala:56)
... 52 elided
Created 02-24-2017 05:00 AM
I think this is a duplicate of the question for which I already posted an answer.
This is the working code again, you can convert to dataframe or do operations in Rdd
import org.apache.spark.{ SparkConf, SparkContext }
import org.apache.spark.sql.functions.broadcast
import org.apache.spark.sql.types._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import scala.collection.mutable.ListBuffer
import java.util.{GregorianCalendar, Date}
import java.util.Calendar
def generateDates(startdate: Date, enddate: Date): ListBuffer[String] = {
var dateList = new ListBuffer[String]()
var calendar = new GregorianCalendar()
calendar.setTime(startdate)
while (calendar.getTime().before(enddate)) {
dateList += calendar.getTime().toString().substring(0, 10) + "," + (calendar.get(Calendar.DAY_OF_MONTH)) + "," + calendar.get(Calendar.MONTH)
calendar.add(Calendar.DATE, 1)
}
dateList += calendar.getTime().toString()
println("\n" + dateList + "\n")
dateList
}
def getRddList(a :String) : ListBuffer[(String,String,String,String,String)] = {
var allDates = new ListBuffer[(String,String,String,String,String)]()
for (x <- generateDates(format.parse(a.split(",")(1)),format.parse(a.split(",")(2)))) {
allDates += (( a.split(",")(0).toString(), x , a.split(",")(3).toString(),
a.split(",")(4).toString(), a.split(",")(5).toString() ))
}
allDates
}
var fileRdd = sc.textFile("/data_1/date1");
var myRdd = fileRdd.map{x=>getRddList(x)}.flatMap(y=>y)
myRdd.collect()
Created 02-24-2017 09:13 AM
Try "explode":
import org.apache.spark.sql.functions.{udf, array, explode, col}
case class Result ( date: String, usage: Double )
def splitUsage = udf { (datediff:Integer, startdate: String, usage:Integer) =>
if (datediff == 32) {
val date = new DateTime(format.parse(startdate))
(for (i <- 0 to datediff) yield Result(format.format(date.plusDays(2).toDate()),
usage.toDouble / datediff.toDouble)).toArray
} else {
Array(Result(startdate, usage.toDouble))
}
}
val df2 = df.withColumn("dayusage", splitUsage($"datediff", $"startdate", $"usage"))
val df3 = df2.select($"*", explode($"dayusage"))
val result = df3.select($"Id", $"startdate", $"enddate", $"datediff", $"did",
col("col")("date").alias("date"), col("col")("usage").alias("usage"))