Created 01-11-2017 06:10 PM
I have a rows like this below.
+--------------+-------------------+-------------------+ | id| line_start_date| line_end_date| +--------------+-------------------+-------------------+ | ID1 | 8/15/2016 00:00:00| 8/21/2016 23:55:59| | ID2 | 1/25/2016 00:00:00| 1/31/2016 23:55:59| | ID3 | 2/29/2016 00:00:00| 2/29/2016 23:55:59| | ID4 | 2/8/2016 00:00:00| 2/14/2016 23:55:59| | ID5 | 7/25/2016 00:00:00| 7/31/2016 23:55:59| | ID6 | 8/1/2016 00:00:00| 8/7/2016 23:55:59| +--------------+-------------------+-------------------+
Note:date format is "MM/dd/yyyy HH:mm:ss" in UTC
I want to generate individual dates between the start date and end date for each of the Ids.
As a result I will have more rows and easily do a groupBy to do aggregation I want.
Example Output Required:
+--------------+-------------------+ | id| dates | +--------------+-------------------+ | ID1 | 8/15/2016 00:00:00| | ID1 | 8/16/2016 00:00:00| | ID1 | 8/17/2016 00:00:00| | ... | ...... | | ... | ...... | +--------------+-------------------+
How can I do this in dataframe API? Searched for hours, no clue yet!
Created 01-11-2017 08:55 PM
Something similar using RDDs
Steps
def main(args: Array[String]): Unit = { var sc = new SparkContext("local[*]", "app1") varfileRdd = sc.textFile("inFile"); var explodedRdd = fileRdd.map{x=>getRddList(x)}.flatMap(y=>y) explodedRdd.saveAsTextFile("outDir") } def getDaysBetweenDates(startdate: Date, enddate: Date): ListBuffer[String] = { var dateList = new ListBuffer[String]() var calendar = new GregorianCalendar() calendar.setTime(startdate) while (calendar.getTime().before(enddate)) { dateList += calendar.getTime().toString() calendar.add(Calendar.DATE, 1) } dateList += calendar.getTime().toString() dateList } def getRddList(a :String) : ListBuffer[(String,String)] = { var allDates = new ListBuffer[(String,String)]() val format = new java.text.SimpleDateFormat("yyyy-MM-dd") for (x <- getDaysBetweenDates(format.parse(a.split(",")(1)), format.parse(a.split(",")(2)))){ allDates += ((a.split(",")(0).toString(),x)) } allDates }
Created 01-11-2017 08:55 PM
Something similar using RDDs
Steps
def main(args: Array[String]): Unit = { var sc = new SparkContext("local[*]", "app1") varfileRdd = sc.textFile("inFile"); var explodedRdd = fileRdd.map{x=>getRddList(x)}.flatMap(y=>y) explodedRdd.saveAsTextFile("outDir") } def getDaysBetweenDates(startdate: Date, enddate: Date): ListBuffer[String] = { var dateList = new ListBuffer[String]() var calendar = new GregorianCalendar() calendar.setTime(startdate) while (calendar.getTime().before(enddate)) { dateList += calendar.getTime().toString() calendar.add(Calendar.DATE, 1) } dateList += calendar.getTime().toString() dateList } def getRddList(a :String) : ListBuffer[(String,String)] = { var allDates = new ListBuffer[(String,String)]() val format = new java.text.SimpleDateFormat("yyyy-MM-dd") for (x <- getDaysBetweenDates(format.parse(a.split(",")(1)), format.parse(a.split(",")(2)))){ allDates += ((a.split(",")(0).toString(),x)) } allDates }
Created 01-11-2017 08:57 PM
++ you could then convert the RDD to a dataframe if required.
Created 02-23-2017 07:11 PM
works perfectly now.
Created 01-13-2017 11:43 PM
Thanks! I will be testing it soon and will be accepting your answer if it works out.