Created 02-22-2017 06:22 AM
I had dataframe data looks like
Id,startdate,enddate,datediff,did,usage 1,2015-08-26,2015-09-27,32,326-10,127 2,2015-09-27,2015-10-20,21,327-99,534 .. .. So my requirement is if datediff is 32 I need to get perday usage For the first id 32 is the datediff so per day it will be 127/32. When I collect the result I should get Id,startdate,enddate,datediff,day,month,did,usage,perdayusage 1,2015-08-26,2015-09-27,32,26,08,326-10,127,3.96 1,2015-08-26,2015-09-27,32,27,08,326-10,127,3.96 1,2015-08-26,2015-09-27,32,28,08,326-10,127,3.96 . . . 1,2015-08-26,2015-09-27,32,27,09,326-10,127,3.96 I had tried I Was struck at initial line since above line transforms me one single but I can't understand how to get multiple rows based single row using datediff Val df2 = df1.select("Id","startdate",enddate","datediff","did","usage").withColumn("Day",dayofmonth($"startdate")).withColumn("Month",month($"startdate")).withColumn("perdayusaga',getperdayusageudf($"usage",$"datediff)) How could I get these results again as a dataframe
Created 02-22-2017 11:28 AM
I tried this with udf and want to take the values to stringbuilder and then on next step I want to explode the values but can able to register the udf but unable get results
val myUdf = udf { (col1: Timestamp, col2: Timestamp, col3: Int, sqlContext: SQLContext) => import sqlContext.implicits._ val sb = StringBuilder.newBuilder if (col3 == 0) { val dd = dayofmonth(date_add($"col1", 1)) val mm = month(date_add($"col1", 1)) val yy = year(date_add($"col1", 1)) val normalizedDate = concat(dd, mm, yy) sb.append(dd).append(",").append(mm).append(",").append(yy).append(",").append(normalizedDate) } else { for (i <- 2 until col3) { val dd = dayofmonth(date_add($"col1", i)) val mm = month(date_add($"col1", i)) val yy = year(date_add($"col1", i)) val normalizedDate = concat(dd, mm, yy) sb.append(dd).append(",").append(mm).append(",").append(yy).append(",").append(normalizedDate) } } sb.toString } java.lang.ClassCastException: $anonfun$1 cannot be cast to scala.Function3 at org.apache.spark.sql.catalyst.expressions.ScalaUDF.<init>(ScalaUDF.scala:106) at org.apache.spark.sql.expressions.UserDefinedFunction.apply(UserDefinedFunction.scala:56) ... 52 elided
Created 02-24-2017 05:00 AM
I think this is a duplicate of the question for which I already posted an answer.
This is the working code again, you can convert to dataframe or do operations in Rdd
import org.apache.spark.{ SparkConf, SparkContext } import org.apache.spark.sql.functions.broadcast import org.apache.spark.sql.types._ import org.apache.spark.sql._ import org.apache.spark.sql.functions._ import scala.collection.mutable.ListBuffer import java.util.{GregorianCalendar, Date} import java.util.Calendar def generateDates(startdate: Date, enddate: Date): ListBuffer[String] = { var dateList = new ListBuffer[String]() var calendar = new GregorianCalendar() calendar.setTime(startdate) while (calendar.getTime().before(enddate)) { dateList += calendar.getTime().toString().substring(0, 10) + "," + (calendar.get(Calendar.DAY_OF_MONTH)) + "," + calendar.get(Calendar.MONTH) calendar.add(Calendar.DATE, 1) } dateList += calendar.getTime().toString() println("\n" + dateList + "\n") dateList } def getRddList(a :String) : ListBuffer[(String,String,String,String,String)] = { var allDates = new ListBuffer[(String,String,String,String,String)]() for (x <- generateDates(format.parse(a.split(",")(1)),format.parse(a.split(",")(2)))) { allDates += (( a.split(",")(0).toString(), x , a.split(",")(3).toString(), a.split(",")(4).toString(), a.split(",")(5).toString() )) } allDates } var fileRdd = sc.textFile("/data_1/date1"); var myRdd = fileRdd.map{x=>getRddList(x)}.flatMap(y=>y) myRdd.collect()
Created 02-24-2017 09:13 AM
Try "explode":
import org.apache.spark.sql.functions.{udf, array, explode, col} case class Result ( date: String, usage: Double ) def splitUsage = udf { (datediff:Integer, startdate: String, usage:Integer) => if (datediff == 32) { val date = new DateTime(format.parse(startdate)) (for (i <- 0 to datediff) yield Result(format.format(date.plusDays(2).toDate()), usage.toDouble / datediff.toDouble)).toArray } else { Array(Result(startdate, usage.toDouble)) } } val df2 = df.withColumn("dayusage", splitUsage($"datediff", $"startdate", $"usage")) val df3 = df2.select($"*", explode($"dayusage")) val result = df3.select($"Id", $"startdate", $"enddate", $"datediff", $"did", col("col")("date").alias("date"), col("col")("usage").alias("usage"))