Created on 03-31-201705:17 PM - edited 08-17-201901:27 PM
The pipelined sorter is a divide and conquer approach applied to the MapOutputBuffer’s sort
which operates on an entire chunk of 256 or 512 Mb of data space. The primary assumption is
that for a significant number of hadoop installations the CPU is under utilized by the task tracker,
with the fundamental limit for the map slots being the disks available.
The DefaultSorter uses two buffers kvbuffer (data) and kvmeta (metadata) which grow from
different parts of the large buffer allocated for operations.
The PipelinedSorter uses a series of similar buffer pairs which grow only in one direction and
involve no looping around back to the front. These bufferpairs are marked as a container class
The allocation system preallocates 16Mb for the first kvmeta and then proceeds to mark the rest
of the entire buffer for kvbuffer0. The collect proceeds normally until either the entire buffer is full
or kvmeta0 acquires 1M items (16 byte per item, 1024x1024 items).
At this point, the kvbuffer0 is marked off as full. The remaining buffer is now allocated according
to the peritem ratio from kvmeta0.size() vs kvbuffer0.size(). If the per item size is larger than
expected, then the kvmeta1 will have fewer items in the reserved space (so <16Mb space). The
collect thread proceeds to switch to use only kvmeta1 and kvbuffer1 (which is again the length of
the whole remaining buffer).
A separate executor service (sortmaster) is hosting a threadpool for sorting the data that has
been collected into the kvmeta0 & kvbuffer0. The sortmaster.submit returns a future result which
is submitted to the merge heap, but the result has not been evaluated yet.
Since every single sort span is sharenothing in nature, the sort operations can happen in
parallel & out of order for any number of spans without any dependency conflicts. The only
criteria is that the comparator operates in a threadsafe manner most binary comparators are,
but the default thread count is 1 to be safe.
The spill is triggered when the last span cannot hold any more items or the task is over. The spill
needs to wait for all the sort futures to return before it can attempt a merge to disk. There is no
lock here except for the implicit one in Future::get().
The merge operation is a simple java.util.PriorityHeap with one code fastpath. The merger
provides a partition filter which is an iterator wrapper which will return false when the current
partition != the filter provided. This means that the data to IFile is being streamed from the
inmemory merge instead of being merged prior to the write.
The merge for the SortSpan case is unique among all the other merges in hadoop, because it
actually has random access (because it is in memory) into the various lists it is merging.
bisect: If two keys are read from the same sort span consecutively, it tries to do a bisect()
operation with the least key from the second least sort span. This does a binarysearch for
second.key() in the first sort span, trying to avoid comparing against a large number of keys. The
bisect returns the offset to which the least sortspan can “gallop” to before performing another
compare. This is especially useful in scenarios where a huge majority of the keys are identical or
if the input key list is already in order.
RLE: The number of operations that resulted in an == compare result is kept track of during the
sort and the merge, which comes of use in the output phase. The data being dumped to disk is
RLE encoded if the key equality comes up at least 10% of the time. This optimization comes of
use when the data operation is something like a join, where the keys repeat for a large number of
times in the right table in the map-join case.
The PipelinedSorter thus brings in two new features to the system an ability to run multiple sort
threads & a slightly better way of dealing with already sorted lists through the bisect.
IFile modifications & future use
fast-forward-merge: The RLE key scenario actually shines when it comes to merging the
various spills together (not implemented yet), since the number of key comparisons during the
merge is unnecessarily large when the same key is repeated a large number of times. So as
long as the same key is being repeated, no further key comparisons are required as we can
fastforward that data into the merged file. To turn on RLE on the merged output without forcing
more comparisons, a new magic DataInputBuffer has been added IFile.REPEAT_KEY, which
will forward the RLE information from the input file into the output file without any comparison
operations when fast-forwarding through a merge.
bisect-bulk-merge: Similar to keeping the index record in the cache list during the merge,
another approach to bring the bisect() behaviour to the spill merges is to keep track of 32 keys
equally spaced in the spill file to check if we can gallop through any of the disk files while
merging. This can potentially be held in the map container memory instead of being written to
disk as it is by necessity a random access feature. Files without a bisect cache entry will
regress to the present situation, but the real win is in skipping comparison operations during a
disk merge and being able to literally copy all data from the current position till the bisect position
without ever doing per-record reads or compares.