Support Questions

Find answers, ask questions, and share your expertise

how to split row into multiple rows on the basis of date using spark with scala?

avatar
New Contributor

I have a dataframe that contains of rows like below and i need to split this data to get month wise series on the basis of pa_start_date and pa_end_date and create a new column period start and end date.

i/p dataframe df is

    p_id pa_id  p_st_date   p_end_date     pa_start_date   pa_end_date  
    p1   pa1    2-Jan-18    5-Dec-18           2-Mar-18        8-Aug-18       
    p1   pa2    3-Jan-18    8-Dec-18           6-Mar-18       10-Nov-18   
    p1   pa3    1-Jan-17    1-Dec-17           9-Feb-17       20-Apr-17

o/p is

p_id pa_id  p_st_date       p_end_date     pa_start_date    pa_end_date   period_start_date  period_end_date
    p1	 pa1	2-Jan-18	5-Dec-18   2-Mar-18	     8-Aug-18	  2-Mar-18 31-Mar-18
    p1	 pa1	2-Jan-18	5-Dec-18   2-Mar-18	     8-Aug-18	  1-Apr-18 30-Apr-18
    p1	 pa1	2-Jan-18	5-Dec-18   2-Mar-18	     8-Aug-18	  1-May-18 31-May-18
    p1	 pa1	2-Jan-18	5-Dec-18   2-Mar-18	     8-Aug-18	  1-Jun-18 30-Jun-18
    p1	 pa1	2-Jan-18	5-Dec-18   2-Mar-18	     8-Aug-18	  1-Jul-18 31-Jul-18
    p1	 pa1	2-Jan-18	5-Dec-18   2-Mar-18	     8-Aug-18	  1-Aug-18 31-Aug-18
    p1   pa2    3-Jan-18        8-Dec-18   6-Mar-18          10-Nov-18    6-Mar-18 31-Mar-18
    p1   pa2    3-Jan-18        8-Dec-18   6-Mar-18          10-Nov-18    1-Apr-18 30-Apr-18
    p1   pa2    3-Jan-18        8-Dec-18   6-Mar-18          10-Nov-18    1-May-18 31-May-18
    p1   pa2    3-Jan-18        8-Dec-18   6-Mar-18          10-Nov-18    1-Jun-18 30-Jun-18
    p1   pa2    3-Jan-18        8-Dec-18   6-Mar-18          10-Nov-18    1-Jul-18 31-Jul-18
    p1   pa2    3-Jan-18        8-Dec-18   6-Mar-18          10-Nov-18    1-Aug-18 31-Aug-18
    p1   pa2    3-Jan-18        8-Dec-18   6-Mar-18          10-Nov-18    1-Sep-18 30-Sep-18
    p1   pa2    3-Jan-18        8-Dec-18   6-Mar-18          10-Nov-18    1-Oct-18 30-Oct-18
    p1   pa2    3-Jan-18        8-Dec-18   6-Mar-18          10-Nov-18    1-Nov-18 30-Nov-18
    p1	 pa3	1-Jan-17	1-Dec-17   9-Feb-17	     20-Apr-17	  9-Feb-17 28-Feb-17
    p1	 pa3	1-Jan-17	1-Dec-17   9-Feb-17	     20-Apr-17	  1-Mar-17 31-Mar-17
    p1	 pa3	1-Jan-17	1-Dec-17   9-Feb-17	     20-Apr-17	  1-Apr-17 30-Apr-17



5 REPLIES 5

avatar
Super Collaborator

avatar
New Contributor

@Bernhard Walter : please could you help

avatar
Super Collaborator

@AArora, is the requirement to create multiple rows from one row where you need to have all "First & Last Day of the Month" between pa_start_date pa_end_date as the period_end_date?

avatar
Super Collaborator

May not be the best approach, but we could do this in a 2 step process.

Step 1
Load the content to a data frame
Apply an UDF to derive a set of period_end_date for the given row
Explode the row based on the period_end_date
Step 2
Derive the period_start_date for the period_end_date based on the pa_start_date

You can either derive end date first and start date next or vice versa.

Below is a code snippet. Can be optimized further

import org.apache.spark.sql.types.{StructType,StructField,StringType,IntegerType};
import org.apache.spark.sql.Row;
import java.util.Date
import scala.collection.mutable.ListBuffer
import java.util.GregorianCalendar
import java.util.Calendar
import java.text.SimpleDateFormat
val csv = sc.textFile("/user/hdfs/ak/spark/197905/")
val rows = csv.map(line => line.split(",").map(_.trim))
val rdd = rows.map(row => Row(row(0),row(1),row(2),row(3),row(4),row(5)))
val schema = new StructType().add(StructField("c0", StringType, true)).add(StructField("c1", StringType, true)).add(StructField("c2", StringType, true)).add(StructField("c3", StringType, true)).add(StructField("c4", StringType, true)).add(StructField("c5", StringType, true))
val df = sqlContext.createDataFrame(rdd, schema)<br>
df.registerTempTable("raw_data");
<br><br>def getLastDateOfMonth(date:Date) : Date ={
        val cal = Calendar.getInstance()
        cal.setTime(date);
        cal.set(Calendar.DAY_OF_MONTH, cal.getActualMaximum(Calendar.DAY_OF_MONTH));
        cal.getTime();
    }
 def getFirstDateOfMonth(date:Date) : Date ={
        val cal = Calendar.getInstance()
        cal.setTime(date);
        cal.set(Calendar.DAY_OF_MONTH, cal.getActualMinimum(Calendar.DAY_OF_MONTH));
        cal.getTime();
    }
  def 
  getLastDaysBetweenDates = (formatString:String, startDateString:String, endDateString:String) => {
    val format = new SimpleDateFormat(formatString)
    val startdate = getLastDateOfMonth(format.parse(startDateString))
    val enddate =getLastDateOfMonth(format.parse(endDateString))
    var dateList = new ListBuffer[Date]()
    var calendar = new GregorianCalendar()
    calendar.setTime(startdate)
    var yearMonth="";
    var maxDates = scala.collection.mutable.Map[String, Date]()
    while (calendar.getTime().before(enddate)) {
      yearMonth = calendar.getTime().getYear()+"_"+calendar.getTime.getMonth()
      maxDates += (yearMonth -> calendar.getTime())
      calendar.add(Calendar.DATE, 1)
    }
    maxDates += (yearMonth -> calendar.getTime())
    for(eachMonth <- maxDates.keySet){
      dateList += maxDates(eachMonth) 
    }
    var dateListString = "";
     for( date <- dateList.sorted){
    dateListString=dateListString+","+format.format(date)
  }
     dateListString.substring(1, dateListString.length())
  }
def 
getFirstDateFromLastDateAndReference = (formatString:String, refDateString:String, lastDate:String)  => {
    val format = new SimpleDateFormat(formatString)
    val firstDay = getFirstDateOfMonth(format.parse(lastDate))
    val year = firstDay.getYear;
    val month = firstDay.getMonth;
    val refDate = format.parse(refDateString)
    val cal = Calendar.getInstance()
    cal.setTime(refDate)
    val refDateTime = cal.getTime();
    val refYear=refDateTime.getYear;
    val refMonth = refDateTime.getMonth();
    if(year==refYear&& month==refMonth){
      refDateString
    }else{
      format.format(firstDay)
    }
 }
  sqlContext.udf.register("lastday",getLastDaysBetweenDates)
sqlContext.udf.register("firstday",getFirstDateFromLastDateAndReference)
  sqlContext.sql("select *,lastday('d-MMM-yy',c4,c5) from raw_data").show();
  sqlContext.sql("select c0,c1,c2,c3,c4,c5,explode(split(lastday('d-MMM-yy',c4,c5),',')) as lastday from hello").registerTempTable("data_with_end_date");
  sqlContext.sql("select c0,c1,c2,c3,c4,c5,lastday,firstday('d-MMM-yy',c4,lastday) from data_with_end_date").show()

I used 2 udfs here
1) getLastDaysBetweenDates - Consumes a date format, start and end dates and returns a list of Month End Dates in this range
2) getFirstDateFromLastDateAndReference - Consumes a date format, a start date and an end date. Returns the first date of the month based on the last date. However for the first month, it returns the pa_start_date instead of the First Calendar date.

avatar
Super Collaborator

Output Data of the form

77684-screen-shot-2018-06-14-at-72629-pm.png