Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

how to split row into multiple rows on the basis of date using spark with scala?

Highlighted

how to split row into multiple rows on the basis of date using spark with scala?

New Contributor

I have a dataframe that contains of rows like below and i need to split this data to get month wise series on the basis of pa_start_date and pa_end_date and create a new column period start and end date.

i/p dataframe df is

    p_id pa_id  p_st_date   p_end_date     pa_start_date   pa_end_date  
    p1   pa1    2-Jan-18    5-Dec-18           2-Mar-18        8-Aug-18       
    p1   pa2    3-Jan-18    8-Dec-18           6-Mar-18       10-Nov-18   
    p1   pa3    1-Jan-17    1-Dec-17           9-Feb-17       20-Apr-17

o/p is

p_id pa_id  p_st_date       p_end_date     pa_start_date    pa_end_date   period_start_date  period_end_date
    p1	 pa1	2-Jan-18	5-Dec-18   2-Mar-18	     8-Aug-18	  2-Mar-18 31-Mar-18
    p1	 pa1	2-Jan-18	5-Dec-18   2-Mar-18	     8-Aug-18	  1-Apr-18 30-Apr-18
    p1	 pa1	2-Jan-18	5-Dec-18   2-Mar-18	     8-Aug-18	  1-May-18 31-May-18
    p1	 pa1	2-Jan-18	5-Dec-18   2-Mar-18	     8-Aug-18	  1-Jun-18 30-Jun-18
    p1	 pa1	2-Jan-18	5-Dec-18   2-Mar-18	     8-Aug-18	  1-Jul-18 31-Jul-18
    p1	 pa1	2-Jan-18	5-Dec-18   2-Mar-18	     8-Aug-18	  1-Aug-18 31-Aug-18
    p1   pa2    3-Jan-18        8-Dec-18   6-Mar-18          10-Nov-18    6-Mar-18 31-Mar-18
    p1   pa2    3-Jan-18        8-Dec-18   6-Mar-18          10-Nov-18    1-Apr-18 30-Apr-18
    p1   pa2    3-Jan-18        8-Dec-18   6-Mar-18          10-Nov-18    1-May-18 31-May-18
    p1   pa2    3-Jan-18        8-Dec-18   6-Mar-18          10-Nov-18    1-Jun-18 30-Jun-18
    p1   pa2    3-Jan-18        8-Dec-18   6-Mar-18          10-Nov-18    1-Jul-18 31-Jul-18
    p1   pa2    3-Jan-18        8-Dec-18   6-Mar-18          10-Nov-18    1-Aug-18 31-Aug-18
    p1   pa2    3-Jan-18        8-Dec-18   6-Mar-18          10-Nov-18    1-Sep-18 30-Sep-18
    p1   pa2    3-Jan-18        8-Dec-18   6-Mar-18          10-Nov-18    1-Oct-18 30-Oct-18
    p1   pa2    3-Jan-18        8-Dec-18   6-Mar-18          10-Nov-18    1-Nov-18 30-Nov-18
    p1	 pa3	1-Jan-17	1-Dec-17   9-Feb-17	     20-Apr-17	  9-Feb-17 28-Feb-17
    p1	 pa3	1-Jan-17	1-Dec-17   9-Feb-17	     20-Apr-17	  1-Mar-17 31-Mar-17
    p1	 pa3	1-Jan-17	1-Dec-17   9-Feb-17	     20-Apr-17	  1-Apr-17 30-Apr-17



5 REPLIES 5

Re: how to split row into multiple rows on the basis of date using spark with scala?

Super Collaborator
Highlighted

Re: how to split row into multiple rows on the basis of date using spark with scala?

New Contributor

@Bernhard Walter : please could you help

Highlighted

Re: how to split row into multiple rows on the basis of date using spark with scala?

Super Collaborator

@AArora, is the requirement to create multiple rows from one row where you need to have all "First & Last Day of the Month" between pa_start_date pa_end_date as the period_end_date?

Highlighted

Re: how to split row into multiple rows on the basis of date using spark with scala?

Super Collaborator

May not be the best approach, but we could do this in a 2 step process.

Step 1
Load the content to a data frame
Apply an UDF to derive a set of period_end_date for the given row
Explode the row based on the period_end_date
Step 2
Derive the period_start_date for the period_end_date based on the pa_start_date

You can either derive end date first and start date next or vice versa.

Below is a code snippet. Can be optimized further

import org.apache.spark.sql.types.{StructType,StructField,StringType,IntegerType};
import org.apache.spark.sql.Row;
import java.util.Date
import scala.collection.mutable.ListBuffer
import java.util.GregorianCalendar
import java.util.Calendar
import java.text.SimpleDateFormat
val csv = sc.textFile("/user/hdfs/ak/spark/197905/")
val rows = csv.map(line => line.split(",").map(_.trim))
val rdd = rows.map(row => Row(row(0),row(1),row(2),row(3),row(4),row(5)))
val schema = new StructType().add(StructField("c0", StringType, true)).add(StructField("c1", StringType, true)).add(StructField("c2", StringType, true)).add(StructField("c3", StringType, true)).add(StructField("c4", StringType, true)).add(StructField("c5", StringType, true))
val df = sqlContext.createDataFrame(rdd, schema)<br>
df.registerTempTable("raw_data");
<br><br>def getLastDateOfMonth(date:Date) : Date ={
        val cal = Calendar.getInstance()
        cal.setTime(date);
        cal.set(Calendar.DAY_OF_MONTH, cal.getActualMaximum(Calendar.DAY_OF_MONTH));
        cal.getTime();
    }
 def getFirstDateOfMonth(date:Date) : Date ={
        val cal = Calendar.getInstance()
        cal.setTime(date);
        cal.set(Calendar.DAY_OF_MONTH, cal.getActualMinimum(Calendar.DAY_OF_MONTH));
        cal.getTime();
    }
  def 
  getLastDaysBetweenDates = (formatString:String, startDateString:String, endDateString:String) => {
    val format = new SimpleDateFormat(formatString)
    val startdate = getLastDateOfMonth(format.parse(startDateString))
    val enddate =getLastDateOfMonth(format.parse(endDateString))
    var dateList = new ListBuffer[Date]()
    var calendar = new GregorianCalendar()
    calendar.setTime(startdate)
    var yearMonth="";
    var maxDates = scala.collection.mutable.Map[String, Date]()
    while (calendar.getTime().before(enddate)) {
      yearMonth = calendar.getTime().getYear()+"_"+calendar.getTime.getMonth()
      maxDates += (yearMonth -> calendar.getTime())
      calendar.add(Calendar.DATE, 1)
    }
    maxDates += (yearMonth -> calendar.getTime())
    for(eachMonth <- maxDates.keySet){
      dateList += maxDates(eachMonth) 
    }
    var dateListString = "";
     for( date <- dateList.sorted){
    dateListString=dateListString+","+format.format(date)
  }
     dateListString.substring(1, dateListString.length())
  }
def 
getFirstDateFromLastDateAndReference = (formatString:String, refDateString:String, lastDate:String)  => {
    val format = new SimpleDateFormat(formatString)
    val firstDay = getFirstDateOfMonth(format.parse(lastDate))
    val year = firstDay.getYear;
    val month = firstDay.getMonth;
    val refDate = format.parse(refDateString)
    val cal = Calendar.getInstance()
    cal.setTime(refDate)
    val refDateTime = cal.getTime();
    val refYear=refDateTime.getYear;
    val refMonth = refDateTime.getMonth();
    if(year==refYear&& month==refMonth){
      refDateString
    }else{
      format.format(firstDay)
    }
 }
  sqlContext.udf.register("lastday",getLastDaysBetweenDates)
sqlContext.udf.register("firstday",getFirstDateFromLastDateAndReference)
  sqlContext.sql("select *,lastday('d-MMM-yy',c4,c5) from raw_data").show();
  sqlContext.sql("select c0,c1,c2,c3,c4,c5,explode(split(lastday('d-MMM-yy',c4,c5),',')) as lastday from hello").registerTempTable("data_with_end_date");
  sqlContext.sql("select c0,c1,c2,c3,c4,c5,lastday,firstday('d-MMM-yy',c4,lastday) from data_with_end_date").show()

I used 2 udfs here
1) getLastDaysBetweenDates - Consumes a date format, start and end dates and returns a list of Month End Dates in this range
2) getFirstDateFromLastDateAndReference - Consumes a date format, a start date and an end date. Returns the first date of the month based on the last date. However for the first month, it returns the pa_start_date instead of the First Calendar date.

Highlighted

Re: how to split row into multiple rows on the basis of date using spark with scala?

Super Collaborator

Output Data of the form

77684-screen-shot-2018-06-14-at-72629-pm.png

Don't have an account?
Coming from Hortonworks? Activate your account here