I run some queries on spark with just one node and then with 3 nodes. And in the spark:4040 UI I see something that I am not understanding.
For example after executing a query with 3 nodes and check the results in the spark UI, in the "input" tab appears 2,8gb, so spark read 2,8gb from hadoop. The same query on hadoop with just one node in local mode appears 7,3gb, the spark read 7,3GB from hadoop. But this value shouldnt be equal?
For example the value of shuffle remains +- equal in one node vs 3. Why the input value doesn't stay equal? The same amount of data must be read from the hdfs, so I am not understanding.
Do you know?
Below the same query on multinode, as you can see input is less but the shuffle remains +- icual, do you know why?
When you are running multiple nodes, Spark will try to parallelize reads depending on the InputFormat, block sizes, and splittable formats (for HDFS reads, it will use "dfs.blocksize" by default). With a single node, your entire input file sizes will be confined to that one node; therefore, it will not be parallelized and will contain the full data size. So for multi-node clusters, you can basically add up the various input sizes and that should be relatively equivalent to the full data size.
On the other hand, shuffle sizes are largely dependent on the transformations and actions you take with the already formed RDDs or DataFrames. Therefore, shuffle read and write sizes may remain of similar sizes. You can also do some things to change the default RDD partitioning, such as rdd.repartition() or rdd.coelesce(), both of which modify the # of partitions to split the data.
Thanks for your help. I understood well the shuffle part, I just didnt understand this part "So for multi-node clusters, you can basically add up the various input sizes and that should be relatively equivalent to the full data size." So this 2,8GB that appears in the second image its just for one specific node of the 3 nodes and dont shows the size for the other two nodes?
When Spark reads a file from HDFS, it creates a single partition for a single input split. Input split is set by the hadoop InputFormat used to read this file. For instance, if you use
textFile() it would be
TextInputFormat in Hadoop, which would return you a single partition for a single block of HDFS (but the split between partitions would be done on line split, not the exact block split), unless you have a compressed text file. In case of compressed file you would get a single partition for a single file (as compressed text files are not splittable).
The actual partition size is defined by the FileInputFormat.computeSplitSize using the below formula:
return Math.max(minSize, Math.min(goalSize, blockSize) where, minSize is the hadoop parameter mapreduce.input.fileinputformat.split.minsize blockSize is the value of the dfs.block.size in cluster mode and fs.local.block.size in the local mode goalSize=totalInputSize/numPartitions where, totalInputSize is the total size in bytes of all the files in the input path. numPartitions is the custom parameter provided to the method sc.textFile(inputPath, numPartitions)