Created 06-13-2018 02:33 PM
I have a dataframe that contains of rows like below and i need to split this data to get month wise series on the basis of pa_start_date and pa_end_date and create a new column period start and end date.
i/p dataframe df is
p_id pa_id p_st_date p_end_date pa_start_date pa_end_date
p1 pa1 2-Jan-18 5-Dec-18 2-Mar-18 8-Aug-18
p1 pa2 3-Jan-18 8-Dec-18 6-Mar-18 10-Nov-18
p1 pa3 1-Jan-17 1-Dec-17 9-Feb-17 20-Apr-17
o/p is
p_id pa_id p_st_date p_end_date pa_start_date pa_end_date period_start_date period_end_date p1 pa1 2-Jan-18 5-Dec-18 2-Mar-18 8-Aug-18 2-Mar-18 31-Mar-18 p1 pa1 2-Jan-18 5-Dec-18 2-Mar-18 8-Aug-18 1-Apr-18 30-Apr-18 p1 pa1 2-Jan-18 5-Dec-18 2-Mar-18 8-Aug-18 1-May-18 31-May-18 p1 pa1 2-Jan-18 5-Dec-18 2-Mar-18 8-Aug-18 1-Jun-18 30-Jun-18 p1 pa1 2-Jan-18 5-Dec-18 2-Mar-18 8-Aug-18 1-Jul-18 31-Jul-18 p1 pa1 2-Jan-18 5-Dec-18 2-Mar-18 8-Aug-18 1-Aug-18 31-Aug-18 p1 pa2 3-Jan-18 8-Dec-18 6-Mar-18 10-Nov-18 6-Mar-18 31-Mar-18 p1 pa2 3-Jan-18 8-Dec-18 6-Mar-18 10-Nov-18 1-Apr-18 30-Apr-18 p1 pa2 3-Jan-18 8-Dec-18 6-Mar-18 10-Nov-18 1-May-18 31-May-18 p1 pa2 3-Jan-18 8-Dec-18 6-Mar-18 10-Nov-18 1-Jun-18 30-Jun-18 p1 pa2 3-Jan-18 8-Dec-18 6-Mar-18 10-Nov-18 1-Jul-18 31-Jul-18 p1 pa2 3-Jan-18 8-Dec-18 6-Mar-18 10-Nov-18 1-Aug-18 31-Aug-18 p1 pa2 3-Jan-18 8-Dec-18 6-Mar-18 10-Nov-18 1-Sep-18 30-Sep-18 p1 pa2 3-Jan-18 8-Dec-18 6-Mar-18 10-Nov-18 1-Oct-18 30-Oct-18 p1 pa2 3-Jan-18 8-Dec-18 6-Mar-18 10-Nov-18 1-Nov-18 30-Nov-18 p1 pa3 1-Jan-17 1-Dec-17 9-Feb-17 20-Apr-17 9-Feb-17 28-Feb-17 p1 pa3 1-Jan-17 1-Dec-17 9-Feb-17 20-Apr-17 1-Mar-17 31-Mar-17 p1 pa3 1-Jan-17 1-Dec-17 9-Feb-17 20-Apr-17 1-Apr-17 30-Apr-17
Created 06-13-2018 06:26 PM
Created 06-14-2018 02:02 AM
@Bernhard Walter : please could you help
Created 06-14-2018 12:23 PM
@AArora, is the requirement to create multiple rows from one row where you need to have all "First & Last Day of the Month" between pa_start_date pa_end_date as the period_end_date?
Created 06-14-2018 11:19 PM
May not be the best approach, but we could do this in a 2 step process.
Step 1
Load the content to a data frame
Apply an UDF to derive a set of period_end_date for the given row
Explode the row based on the period_end_date
Step 2
Derive the period_start_date for the period_end_date based on the pa_start_date
You can either derive end date first and start date next or vice versa.
Below is a code snippet. Can be optimized further
import org.apache.spark.sql.types.{StructType,StructField,StringType,IntegerType}; import org.apache.spark.sql.Row; import java.util.Date import scala.collection.mutable.ListBuffer import java.util.GregorianCalendar import java.util.Calendar import java.text.SimpleDateFormat val csv = sc.textFile("/user/hdfs/ak/spark/197905/") val rows = csv.map(line => line.split(",").map(_.trim)) val rdd = rows.map(row => Row(row(0),row(1),row(2),row(3),row(4),row(5))) val schema = new StructType().add(StructField("c0", StringType, true)).add(StructField("c1", StringType, true)).add(StructField("c2", StringType, true)).add(StructField("c3", StringType, true)).add(StructField("c4", StringType, true)).add(StructField("c5", StringType, true)) val df = sqlContext.createDataFrame(rdd, schema)<br> df.registerTempTable("raw_data"); <br><br>def getLastDateOfMonth(date:Date) : Date ={ val cal = Calendar.getInstance() cal.setTime(date); cal.set(Calendar.DAY_OF_MONTH, cal.getActualMaximum(Calendar.DAY_OF_MONTH)); cal.getTime(); } def getFirstDateOfMonth(date:Date) : Date ={ val cal = Calendar.getInstance() cal.setTime(date); cal.set(Calendar.DAY_OF_MONTH, cal.getActualMinimum(Calendar.DAY_OF_MONTH)); cal.getTime(); } def getLastDaysBetweenDates = (formatString:String, startDateString:String, endDateString:String) => { val format = new SimpleDateFormat(formatString) val startdate = getLastDateOfMonth(format.parse(startDateString)) val enddate =getLastDateOfMonth(format.parse(endDateString)) var dateList = new ListBuffer[Date]() var calendar = new GregorianCalendar() calendar.setTime(startdate) var yearMonth=""; var maxDates = scala.collection.mutable.Map[String, Date]() while (calendar.getTime().before(enddate)) { yearMonth = calendar.getTime().getYear()+"_"+calendar.getTime.getMonth() maxDates += (yearMonth -> calendar.getTime()) calendar.add(Calendar.DATE, 1) } maxDates += (yearMonth -> calendar.getTime()) for(eachMonth <- maxDates.keySet){ dateList += maxDates(eachMonth) } var dateListString = ""; for( date <- dateList.sorted){ dateListString=dateListString+","+format.format(date) } dateListString.substring(1, dateListString.length()) } def getFirstDateFromLastDateAndReference = (formatString:String, refDateString:String, lastDate:String) => { val format = new SimpleDateFormat(formatString) val firstDay = getFirstDateOfMonth(format.parse(lastDate)) val year = firstDay.getYear; val month = firstDay.getMonth; val refDate = format.parse(refDateString) val cal = Calendar.getInstance() cal.setTime(refDate) val refDateTime = cal.getTime(); val refYear=refDateTime.getYear; val refMonth = refDateTime.getMonth(); if(year==refYear&& month==refMonth){ refDateString }else{ format.format(firstDay) } } sqlContext.udf.register("lastday",getLastDaysBetweenDates) sqlContext.udf.register("firstday",getFirstDateFromLastDateAndReference) sqlContext.sql("select *,lastday('d-MMM-yy',c4,c5) from raw_data").show(); sqlContext.sql("select c0,c1,c2,c3,c4,c5,explode(split(lastday('d-MMM-yy',c4,c5),',')) as lastday from hello").registerTempTable("data_with_end_date"); sqlContext.sql("select c0,c1,c2,c3,c4,c5,lastday,firstday('d-MMM-yy',c4,lastday) from data_with_end_date").show()
I used 2 udfs here
1) getLastDaysBetweenDates - Consumes a date format, start and end dates and returns a list of Month End Dates in this range
2) getFirstDateFromLastDateAndReference - Consumes a date format, a start date and an end date. Returns the first date of the month based on the last date. However for the first month, it returns the pa_start_date instead of the First Calendar date.
Created on 06-14-2018 11:28 PM - edited 08-17-2019 07:11 PM
Output Data of the form