I am new to Spark (pyspark, to be concrete). In my task I have cached in memory dataframe of 300 million rows. On the other hand I have 215 files, each I can read into dataframe of 600k rows. I want to iteratively read file by file, find intersection by some field with cached dataframe and write result to another file.
First 10-12 iterates are being done pretty fast with equal time, 20-25 sec, I monitor them Spark UI. However execution time at next few iterates increases dramatically, 1-2 min and more. To be more concrete, at write operation. Finally, the whole job get stuck, so I have to kill it.
Could you please give me an idea, why this happens? Which resources I need to release (and how) to start each iteration from scratch?
count = 0 ln = len(bkg_lst) start = 0 for i in bkg_lst[:]: # list contains paths to files part_bkg = sqlContext.read.option("delimiter", ";").format('com.databricks.spark.csv').options(header='false', inferschema='true').load(i) if len(part_bkg.head(1)) != 0: # filter out unnessesary columns part_bkg = part_bkg.select('BKG_ID', 'PNR_ID', 'PAX_ID', 'SEG_NBR', 'PNR_CRTD_DT', 'OPRTNG_FLT_DT_UTC', 'TRAVELER_ORIG_TERMINAL', 'TRAVELER_DEST_TERMINAL', 'DEL_FLG', 'SRC_SYS_CRTD_TMSTMP') part_bkg.createOrReplaceTempView("bkg") # here goes nessesary SQL select, not heavy for sure res_df = sqlContext.sql("""select b.ucp_id from ...""") if len(res_df.head(1)) != 0: res_df.select('UCP_ID', 'TRAVELER_DEST_TERMINAL').intersect(df_rec.select('UCP_ID','ITEM_KEY')).select('UCP_ID').coalesce(1).write.save(path='/tmp/true_recs/' +str(count) , format='csv', mode='append', sep=',') count +=1 print(count, " of ", ln)<br>
Thank you for answering! Yes, I were monitoring the progress in Spark web UI. The amount of job did not grow from iteration to iteration, while the time spending on these jobs did.
To be honest, I managed my problem. I just read the whole file and successfully joined it with another. However, I want to know, why "for"-loops are so ineffective in terms of Spark.