Created 06-13-2018 02:33 PM
I have a dataframe that contains of rows like below and i need to split this data to get month wise series on the basis of pa_start_date and pa_end_date and create a new column period start and end date.
i/p dataframe df is
p_id pa_id p_st_date p_end_date pa_start_date pa_end_date
p1 pa1 2-Jan-18 5-Dec-18 2-Mar-18 8-Aug-18
p1 pa2 3-Jan-18 8-Dec-18 6-Mar-18 10-Nov-18
p1 pa3 1-Jan-17 1-Dec-17 9-Feb-17 20-Apr-17o/p is
p_id pa_id p_st_date p_end_date pa_start_date pa_end_date period_start_date period_end_date
p1 pa1 2-Jan-18 5-Dec-18 2-Mar-18 8-Aug-18 2-Mar-18 31-Mar-18
p1 pa1 2-Jan-18 5-Dec-18 2-Mar-18 8-Aug-18 1-Apr-18 30-Apr-18
p1 pa1 2-Jan-18 5-Dec-18 2-Mar-18 8-Aug-18 1-May-18 31-May-18
p1 pa1 2-Jan-18 5-Dec-18 2-Mar-18 8-Aug-18 1-Jun-18 30-Jun-18
p1 pa1 2-Jan-18 5-Dec-18 2-Mar-18 8-Aug-18 1-Jul-18 31-Jul-18
p1 pa1 2-Jan-18 5-Dec-18 2-Mar-18 8-Aug-18 1-Aug-18 31-Aug-18
p1 pa2 3-Jan-18 8-Dec-18 6-Mar-18 10-Nov-18 6-Mar-18 31-Mar-18
p1 pa2 3-Jan-18 8-Dec-18 6-Mar-18 10-Nov-18 1-Apr-18 30-Apr-18
p1 pa2 3-Jan-18 8-Dec-18 6-Mar-18 10-Nov-18 1-May-18 31-May-18
p1 pa2 3-Jan-18 8-Dec-18 6-Mar-18 10-Nov-18 1-Jun-18 30-Jun-18
p1 pa2 3-Jan-18 8-Dec-18 6-Mar-18 10-Nov-18 1-Jul-18 31-Jul-18
p1 pa2 3-Jan-18 8-Dec-18 6-Mar-18 10-Nov-18 1-Aug-18 31-Aug-18
p1 pa2 3-Jan-18 8-Dec-18 6-Mar-18 10-Nov-18 1-Sep-18 30-Sep-18
p1 pa2 3-Jan-18 8-Dec-18 6-Mar-18 10-Nov-18 1-Oct-18 30-Oct-18
p1 pa2 3-Jan-18 8-Dec-18 6-Mar-18 10-Nov-18 1-Nov-18 30-Nov-18
p1 pa3 1-Jan-17 1-Dec-17 9-Feb-17 20-Apr-17 9-Feb-17 28-Feb-17
p1 pa3 1-Jan-17 1-Dec-17 9-Feb-17 20-Apr-17 1-Mar-17 31-Mar-17
p1 pa3 1-Jan-17 1-Dec-17 9-Feb-17 20-Apr-17 1-Apr-17 30-Apr-17
Created 06-13-2018 06:26 PM
Created 06-14-2018 02:02 AM
@Bernhard Walter : please could you help
Created 06-14-2018 12:23 PM
@AArora, is the requirement to create multiple rows from one row where you need to have all "First & Last Day of the Month" between pa_start_date pa_end_date as the period_end_date?
Created 06-14-2018 11:19 PM
May not be the best approach, but we could do this in a 2 step process.
Step 1
Load the content to a data frame
Apply an UDF to derive a set of period_end_date for the given row
Explode the row based on the period_end_date
Step 2
Derive the period_start_date for the period_end_date based on the pa_start_date
You can either derive end date first and start date next or vice versa.
Below is a code snippet. Can be optimized further
import org.apache.spark.sql.types.{StructType,StructField,StringType,IntegerType};
import org.apache.spark.sql.Row;
import java.util.Date
import scala.collection.mutable.ListBuffer
import java.util.GregorianCalendar
import java.util.Calendar
import java.text.SimpleDateFormat
val csv = sc.textFile("/user/hdfs/ak/spark/197905/")
val rows = csv.map(line => line.split(",").map(_.trim))
val rdd = rows.map(row => Row(row(0),row(1),row(2),row(3),row(4),row(5)))
val schema = new StructType().add(StructField("c0", StringType, true)).add(StructField("c1", StringType, true)).add(StructField("c2", StringType, true)).add(StructField("c3", StringType, true)).add(StructField("c4", StringType, true)).add(StructField("c5", StringType, true))
val df = sqlContext.createDataFrame(rdd, schema)<br>
df.registerTempTable("raw_data");
<br><br>def getLastDateOfMonth(date:Date) : Date ={
val cal = Calendar.getInstance()
cal.setTime(date);
cal.set(Calendar.DAY_OF_MONTH, cal.getActualMaximum(Calendar.DAY_OF_MONTH));
cal.getTime();
}
def getFirstDateOfMonth(date:Date) : Date ={
val cal = Calendar.getInstance()
cal.setTime(date);
cal.set(Calendar.DAY_OF_MONTH, cal.getActualMinimum(Calendar.DAY_OF_MONTH));
cal.getTime();
}
def
getLastDaysBetweenDates = (formatString:String, startDateString:String, endDateString:String) => {
val format = new SimpleDateFormat(formatString)
val startdate = getLastDateOfMonth(format.parse(startDateString))
val enddate =getLastDateOfMonth(format.parse(endDateString))
var dateList = new ListBuffer[Date]()
var calendar = new GregorianCalendar()
calendar.setTime(startdate)
var yearMonth="";
var maxDates = scala.collection.mutable.Map[String, Date]()
while (calendar.getTime().before(enddate)) {
yearMonth = calendar.getTime().getYear()+"_"+calendar.getTime.getMonth()
maxDates += (yearMonth -> calendar.getTime())
calendar.add(Calendar.DATE, 1)
}
maxDates += (yearMonth -> calendar.getTime())
for(eachMonth <- maxDates.keySet){
dateList += maxDates(eachMonth)
}
var dateListString = "";
for( date <- dateList.sorted){
dateListString=dateListString+","+format.format(date)
}
dateListString.substring(1, dateListString.length())
}
def
getFirstDateFromLastDateAndReference = (formatString:String, refDateString:String, lastDate:String) => {
val format = new SimpleDateFormat(formatString)
val firstDay = getFirstDateOfMonth(format.parse(lastDate))
val year = firstDay.getYear;
val month = firstDay.getMonth;
val refDate = format.parse(refDateString)
val cal = Calendar.getInstance()
cal.setTime(refDate)
val refDateTime = cal.getTime();
val refYear=refDateTime.getYear;
val refMonth = refDateTime.getMonth();
if(year==refYear&& month==refMonth){
refDateString
}else{
format.format(firstDay)
}
}
sqlContext.udf.register("lastday",getLastDaysBetweenDates)
sqlContext.udf.register("firstday",getFirstDateFromLastDateAndReference)
sqlContext.sql("select *,lastday('d-MMM-yy',c4,c5) from raw_data").show();
sqlContext.sql("select c0,c1,c2,c3,c4,c5,explode(split(lastday('d-MMM-yy',c4,c5),',')) as lastday from hello").registerTempTable("data_with_end_date");
sqlContext.sql("select c0,c1,c2,c3,c4,c5,lastday,firstday('d-MMM-yy',c4,lastday) from data_with_end_date").show()
I used 2 udfs here
1) getLastDaysBetweenDates - Consumes a date format, start and end dates and returns a list of Month End Dates in this range
2) getFirstDateFromLastDateAndReference - Consumes a date format, a start date and an end date. Returns the first date of the month based on the last date. However for the first month, it returns the pa_start_date instead of the First Calendar date.
Created on 06-14-2018 11:28 PM - edited 08-17-2019 07:11 PM
Output Data of the form