Support Questions

Find answers, ask questions, and share your expertise
Announcements
Check out our newest addition to the community, the Cloudera Data Analytics (CDA) group hub.

Spark get slower in iteration process.

Explorer

Hello!

I am new to Spark (pyspark, to be concrete). In my task I have cached in memory dataframe of 300 million rows. On the other hand I have 215 files, each I can read into dataframe of 600k rows. I want to iteratively read file by file, find intersection by some field with cached dataframe and write result to another file.

First 10-12 iterates are being done pretty fast with equal time, 20-25 sec, I monitor them Spark UI. However execution time at next few iterates increases dramatically, 1-2 min and more. To be more concrete, at write operation. Finally, the whole job get stuck, so I have to kill it.

Could you please give me an idea, why this happens? Which resources I need to release (and how) to start each iteration from scratch?

3 REPLIES 3

Explorer
count = 0
ln = len(bkg_lst) 
start = 0
for i in bkg_lst[:]: # list contains paths to files
    part_bkg = sqlContext.read.option("delimiter", ";").format('com.databricks.spark.csv').options(header='false', inferschema='true').load(i)
    if len(part_bkg.head(1)) != 0:
        # filter out unnessesary columns
        part_bkg = part_bkg.select('BKG_ID', 'PNR_ID', 'PAX_ID', 'SEG_NBR', 'PNR_CRTD_DT', 'OPRTNG_FLT_DT_UTC', 'TRAVELER_ORIG_TERMINAL', 'TRAVELER_DEST_TERMINAL', 'DEL_FLG', 'SRC_SYS_CRTD_TMSTMP')
         part_bkg.createOrReplaceTempView("bkg")        
	# here goes nessesary SQL select, not heavy for sure
        res_df = sqlContext.sql("""select b.ucp_id from ...""")                 
        if len(res_df.head(1)) != 0:                         
             res_df.select('UCP_ID', 'TRAVELER_DEST_TERMINAL').intersect(df_rec.select('UCP_ID','ITEM_KEY')).select('UCP_ID').coalesce(1).write.save(path='/tmp/true_recs/' +str(count) , format='csv', mode='append', sep=',')
    count +=1
    print(count, " of ", ln)<br>

@Stanislav Nikitin did you checked the GC time in spark UI?

Explorer

@Felix Albani

Thank you for answering! Yes, I were monitoring the progress in Spark web UI. The amount of job did not grow from iteration to iteration, while the time spending on these jobs did.

To be honest, I managed my problem. I just read the whole file and successfully joined it with another. However, I want to know, why "for"-loops are so ineffective in terms of Spark.

Take a Tour of the Community
Don't have an account?
Your experience may be limited. Sign in to explore more.