Created 07-08-2017 03:06 AM
Okay guys, I have a situation where I have the following schema for my data frame:
Customer_Id Role_Code Start_TimeStamp End_Timestamp
Ray123 1 2015 2017
Kate -- 2016 2017
I wish to decide the Role_Code of a given customer(say "Ray123") based on a few conditions. Let's say his Role_Code comes out to be 1. I then process the next row and the next customer(say "Kate123") has overlapping time with Ray123, then she can challenge Ray123 and might win against him to have Role_Code 1 (based on some other conditions). And so if she wins, for the overlapping time period, I need to set the Role_Code of Ray123 as 2 so the data looks like:
Customer_Id Role_Code Start_TimeStamp End_Timestamp
Ray123 1 2015 2016
Ray123 2 2016 2017
Kate123 1 2016 2017
There are similar things happening where I need to go back and forth and pick rows and compare the timestamps and some other fields, then take unions and do except etc to get a final data frame with the correct set of customers with correct set of role codes. The problem is, the solution works fine if i have 5-6 rows, but if i test against eg. 70 rows, the YARN container kills the job, it always runs out of memory. I don't know how else to solve this problem without multiple actions such as head(),first() etc coming in the way to process each row and then split the rows effectively. Any input can help at this point!!!
Created 07-11-2017 01:55 PM
@Geetika Guleria you might consider re-constructing your original data to have all intervals
e.g. Ray123 1 2015 2017 -> Ray123 1 2015 2016 and Ray123 1 2016 2017
assign rank based on the other rows by either mapping on the start_end timestamp.
e.g. 2015_2016 -> [Ray123] , 2016_2017 -> [Ray123,Kate]
but the problem could be to preserve the sequence of the rows, do you have other cols to track that e.g sequence, or id
Created 07-11-2017 01:55 PM
I got something in pyspark (supposing you have the priorities mentioned in the last column of every row). You could have this defined somewhere else too and use that as in input to sorting function. I tested it with 200 rows of data and it worked quite fast without giving error. Not sure if this will scale up because it maintains all records in memory in a dictionary structure for final sorting. Anyways you can try this.
Format of roleinput.txt:
wcrdz - 2013 2015 4
psdzn - 2013 2015 2
imjvl - 2014 2017 4
pzpzv - 2014 2017 2
ngsdv - 2012 2015 3
gptxb - 2013 2015 1
(role is initially - and will be assigned in the end unique to a time-group. Priority is indicated by last column)
import pyspark
import sys
from pyspark import SparkContext
sc=SparkContext(appName='role')
text=sc.textFile('/user/user1/roleinput.txt')
overlap_dict=text.map(lambda x:(x.split()[2]+x.split()[3],[x]))
overlap_sorted_by_priority=overlap_dict.reduceByKey(lambda a,b:sorted(a+b,key=lambda x:int(x.split()[-1]))) assigned_roles=overlap_sorted_by_priority.map(lambda x:x[1] if isinstance(x[1],list) else [x[1]]).map(lambda x:[' '.join([item.split()[0],str(x.index(item)+1),item.split()[2],item.split()[3]]) for item in x])
assigned_roles.flatMap(lambda x:x).collect()
Created 08-07-2017 06:53 AM
Useful information