Created on 03-31-2017 05:17 PM - edited 08-17-2019 01:27 PM
The pipelined sorter is a divide and conquer approach applied to the MapOutputBuffer’s sort which operates on an entire chunk of 256 or 512 Mb of data space. The primary assumption is that for a significant number of hadoop installations the CPU is under utilized by the task tracker, with the fundamental limit for the map slots being the disks available.
The DefaultSorter uses two buffers kvbuffer (data) and kvmeta (metadata) which grow from different parts of the large buffer allocated for operations.
The PipelinedSorter uses a series of similar buffer pairs which grow only in one direction and involve no looping around back to the front. These bufferpairs are marked as a container class named SortSpan.
The allocation system preallocates 16Mb for the first kvmeta and then proceeds to mark the rest of the entire buffer for kvbuffer0. The collect proceeds normally until either the entire buffer is full or kvmeta0 acquires 1M items (16 byte per item, 1024x1024 items).
At this point, the kvbuffer0 is marked off as full. The remaining buffer is now allocated according to the peritem ratio from kvmeta0.size() vs kvbuffer0.size(). If the per item size is larger than expected, then the kvmeta1 will have fewer items in the reserved space (so <16Mb space). The collect thread proceeds to switch to use only kvmeta1 and kvbuffer1 (which is again the length of the whole remaining buffer).
A separate executor service (sortmaster) is hosting a threadpool for sorting the data that has been collected into the kvmeta0 & kvbuffer0. The sortmaster.submit returns a future result which is submitted to the merge heap, but the result has not been evaluated yet.
Since every single sort span is sharenothing in nature, the sort operations can happen in parallel & out of order for any number of spans without any dependency conflicts. The only criteria is that the comparator operates in a threadsafe manner most binary comparators are, but the default thread count is 1 to be safe.
The spill is triggered when the last span cannot hold any more items or the task is over. The spill needs to wait for all the sort futures to return before it can attempt a merge to disk. There is no lock here except for the implicit one in Future::get().
The merge operation is a simple java.util.PriorityHeap with one code fastpath. The merger provides a partition filter which is an iterator wrapper which will return false when the current partition != the filter provided. This means that the data to IFile is being streamed from the inmemory merge instead of being merged prior to the write.
The merge for the SortSpan case is unique among all the other merges in hadoop, because it actually has random access (because it is in memory) into the various lists it is merging.
bisect: If two keys are read from the same sort span consecutively, it tries to do a bisect() operation with the least key from the second least sort span. This does a binarysearch for second.key() in the first sort span, trying to avoid comparing against a large number of keys. The bisect returns the offset to which the least sortspan can “gallop” to before performing another compare. This is especially useful in scenarios where a huge majority of the keys are identical or if the input key list is already in order.
RLE: The number of operations that resulted in an == compare result is kept track of during the sort and the merge, which comes of use in the output phase. The data being dumped to disk is RLE encoded if the key equality comes up at least 10% of the time. This optimization comes of use when the data operation is something like a join, where the keys repeat for a large number of times in the right table in the map-join case.
The PipelinedSorter thus brings in two new features to the system an ability to run multiple sort threads & a slightly better way of dealing with already sorted lists through the bisect.
IFile modifications & future use
fast-forward-merge: The RLE key scenario actually shines when it comes to merging the various spills together (not implemented yet), since the number of key comparisons during the merge is unnecessarily large when the same key is repeated a large number of times. So as long as the same key is being repeated, no further key comparisons are required as we can fastforward that data into the merged file. To turn on RLE on the merged output without forcing more comparisons, a new magic DataInputBuffer has been added IFile.REPEAT_KEY, which will forward the RLE information from the input file into the output file without any comparison operations when fast-forwarding through a merge.
bisect-bulk-merge: Similar to keeping the index record in the cache list during the merge, another approach to bring the bisect() behaviour to the spill merges is to keep track of 32 keys equally spaced in the spill file to check if we can gallop through any of the disk files while merging. This can potentially be held in the map container memory instead of being written to disk as it is by necessity a random access feature. Files without a bisect cache entry will regress to the present situation, but the real win is in skipping comparison operations during a disk merge and being able to literally copy all data from the current position till the bisect position without ever doing per-record reads or compares.