I'm trying to process many files (several hundreds thousands of files, stored in HDFS). By "process", I mean read those binary files, parse their content and store "human readable" content in HBase table.
My first implementation was quite straightforward : java application running on edge node, iterating over the list of files and processing them sequentially. This implementation basically needed between 15s and 2min to process a file (depending its size), so it seemed that it would take several weeks to process the whole set of files.
That's why I decided to try to parallelize these operation over our cluster to save time. I tried to implement very basic spark job : This job compute a list of paths (the paths of the file to process) and parallelize this list using following API :
JavaRDD<String> distFilePaths = sc.parallelize(listOfFilesPaths, 100);
I tried many different configuration about the number of executors, cores per executors, number of partitions (to split my list of files - 100 in the example above), but noticed a that performances dramatically dropped with this implementation (meaning that "unitary" processing a file by an executor now takes 5 to 10 (sometimes a lot more) more time than previous implementation).
Couldn't figure out why because I am new to spark and not able to perform profiling or equivalent analysis on the cluster, so I have a bunch of questions :
Any help will be really appreciated, I can provide more details about my application if I missed some important points of course
Among the things I gave a try to, I suspected that maybe parallelizing the whole set of file paths (more than 100000) one shot without specifying the number of partitions was a problem, because it seemed that some containers were assigned very few files whereas others had many many files to process, and thus some containers were not used after a while, waiting after the others to complete their tasks...
For that reason, I tried to reduce the number of files to distribute (in my driver application, I added a loop to "parallelize" those files by "batch" of 1000 files, so that every 1000 files a fair distribution is done to obtain better repartition over my containers, but couldnt understand exactly how spark proceeded.
Basically, for each iteration of my driver, I have 1000 files to process. I arbitrarily splitted these files in 100 partitions => spark has 100 tasks to complete, each one focusing on 10 files.
If I ask for 20 executors with 2 cores per executors (for example, and supposing that yarn actually provides me such resources), it means that spark should be able to "distribute" 40 tasks in parallel, and keep 60 tasks to feed again the containers that would first complete their task...Am I right ?
But one additional detail that may be important : it seems that first tasks are "pretty fast" (almost as fast as non-spark implementation), but subsequent ones are getting slower and slower along time, till a new batch of 1000 files is started
This is precisely the kind of issue I am facing too. The initial tasks finish within seconds and the rest of them take a massive amount of time, ranging to hours and eventually failing. Any improvements or discoveries?