Support Questions
Find answers, ask questions, and share your expertise

What is the best way to perform row wise calculations in Spark for my problem? Details below

New Contributor

Okay guys, I have a situation where I have the following schema for my data frame:

Customer_Id Role_Code Start_TimeStamp End_Timestamp

Ray123 1 2015 2017

Kate -- 2016 2017

I wish to decide the Role_Code of a given customer(say "Ray123") based on a few conditions. Let's say his Role_Code comes out to be 1. I then process the next row and the next customer(say "Kate123") has overlapping time with Ray123, then she can challenge Ray123 and might win against him to have Role_Code 1 (based on some other conditions). And so if she wins, for the overlapping time period, I need to set the Role_Code of Ray123 as 2 so the data looks like:

Customer_Id Role_Code Start_TimeStamp End_Timestamp

Ray123 1 2015 2016

Ray123 2 2016 2017

Kate123 1 2016 2017

There are similar things happening where I need to go back and forth and pick rows and compare the timestamps and some other fields, then take unions and do except etc to get a final data frame with the correct set of customers with correct set of role codes. The problem is, the solution works fine if i have 5-6 rows, but if i test against eg. 70 rows, the YARN container kills the job, it always runs out of memory. I don't know how else to solve this problem without multiple actions such as head(),first() etc coming in the way to process each row and then split the rows effectively. Any input can help at this point!!!


Cloudera Employee

@Geetika Guleria you might consider re-constructing your original data to have all intervals

e.g. Ray123 1 2015 2017 -> Ray123 1 2015 2016 and Ray123 1 2016 2017

assign rank based on the other rows by either mapping on the start_end timestamp.

e.g. 2015_2016 -> [Ray123] , 2016_2017 -> [Ray123,Kate]

but the problem could be to preserve the sequence of the rows, do you have other cols to track that e.g sequence, or id

Expert Contributor

@Geetika Guleria

I got something in pyspark (supposing you have the priorities mentioned in the last column of every row). You could have this defined somewhere else too and use that as in input to sorting function. I tested it with 200 rows of data and it worked quite fast without giving error. Not sure if this will scale up because it maintains all records in memory in a dictionary structure for final sorting. Anyways you can try this.

Format of roleinput.txt:

wcrdz - 2013 2015 4

psdzn - 2013 2015 2

imjvl - 2014 2017 4

pzpzv - 2014 2017 2

ngsdv - 2012 2015 3

gptxb - 2013 2015 1

(role is initially - and will be assigned in the end unique to a time-group. Priority is indicated by last column)

import pyspark

import sys

from pyspark import SparkContext


text=sc.textFile('/user/user1/roleinput.txt') x:(x.split()[2]+x.split()[3],[x]))

overlap_sorted_by_priority=overlap_dict.reduceByKey(lambda a,b:sorted(a+b,key=lambda x:int(x.split()[-1]))) x:x[1] if isinstance(x[1],list) else [x[1]]).map(lambda x:[' '.join([item.split()[0],str(x.index(item)+1),item.split()[2],item.split()[3]]) for item in x])

assigned_roles.flatMap(lambda x:x).collect()

New Contributor

Useful information

; ;