Skip to content

Implement a faster stable sort algorithm #90545

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
270 changes: 141 additions & 129 deletions library/alloc/src/slice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,10 +249,10 @@ impl<T> [T] {
///
/// # Current implementation
///
/// The current algorithm is an adaptive, iterative merge sort inspired by
/// [timsort](https://en.wikipedia.org/wiki/Timsort).
/// It is designed to be very fast in cases where the slice is nearly sorted, or consists of
/// two or more sorted sequences concatenated one after another.
/// This is a stable two stage merge sort with pre-sorted prefix optimization.
/// It is quite fast in cases where the slice is nearly sorted, or consists of
/// two or more sorted sequences concatenated one after another, while remaining very fast for
/// randomly sorted sequences.
///
/// Also, it allocates temporary storage half the size of `self`, but for short slices a
/// non-allocating insertion sort is used instead.
Expand Down Expand Up @@ -302,10 +302,10 @@ impl<T> [T] {
///
/// # Current implementation
///
/// The current algorithm is an adaptive, iterative merge sort inspired by
/// [timsort](https://en.wikipedia.org/wiki/Timsort).
/// It is designed to be very fast in cases where the slice is nearly sorted, or consists of
/// two or more sorted sequences concatenated one after another.
/// This is a stable two stage merge sort with pre-sorted prefix optimization.
/// It is quite fast in cases where the slice is nearly sorted, or consists of
/// two or more sorted sequences concatenated one after another, while remaining very fast for
/// randomly sorted sequences.
///
/// Also, it allocates temporary storage half the size of `self`, but for short slices a
/// non-allocating insertion sort is used instead.
Expand Down Expand Up @@ -879,34 +879,37 @@ impl<T: Clone> ToOwned for [T] {
// Sorting
////////////////////////////////////////////////////////////////////////////////

/// Inserts `v[0]` into pre-sorted sequence `v[1..]` so that whole `v[..]` becomes sorted.
/// Inserts `v[v.len() - 1]` into pre-sorted sequence `v[..v.len() - 1]` so that whole `v[..]` becomes sorted.
///
/// This is the integral subroutine of insertion sort.
#[cfg(not(no_global_oom_handling))]
fn insert_head<T, F>(v: &mut [T], is_less: &mut F)
// benchmarking indicated that inlining makes a substantial improvement, yet only requires a couple of hundred bytes
#[inline(always)]
fn insert_end<T, F>(v: &mut [T], is_less: &mut F)
where
F: FnMut(&T, &T) -> bool,
{
if v.len() >= 2 && is_less(&v[1], &v[0]) {
let end = v.len().saturating_sub(1);
if end > 0 && is_less(&v[end], &v[end - 1]) {
unsafe {
// There are three ways to implement insertion here:
//
// 1. Swap adjacent elements until the first one gets to its final destination.
// 1. Swap adjacent elements until the last one gets to its final destination.
// However, this way we copy data around more than is necessary. If elements are big
// structures (costly to copy), this method will be slow.
//
// 2. Iterate until the right place for the first element is found. Then shift the
// elements succeeding it to make room for it and finally place it into the
// 2. Iterate until the right place for the last element is found. Then shift the
// elements preceeding it to make room for it and finally place it into the
// remaining hole. This is a good method.
//
// 3. Copy the first element into a temporary variable. Iterate until the right place
// 3. Copy the last element into a temporary variable. Iterate until the right place
// for it is found. As we go along, copy every traversed element into the slot
// preceding it. Finally, copy data from the temporary variable into the remaining
// succeeding it. Finally, copy data from the temporary variable into the remaining
// hole. This method is very good. Benchmarks demonstrated slightly better
// performance than with the 2nd method.
//
// All methods were benchmarked, and the 3rd showed best results. So we chose that one.
let tmp = mem::ManuallyDrop::new(ptr::read(&v[0]));
let tmp = mem::ManuallyDrop::new(ptr::read(v.get_unchecked(end)));

// Intermediate state of the insertion process is always tracked by `hole`, which
// serves two purposes:
Expand All @@ -918,15 +921,14 @@ where
// If `is_less` panics at any point during the process, `hole` will get dropped and
// fill the hole in `v` with `tmp`, thus ensuring that `v` still holds every object it
// initially held exactly once.
let mut hole = InsertionHole { src: &*tmp, dest: &mut v[1] };
ptr::copy_nonoverlapping(&v[1], &mut v[0], 1);

for i in 2..v.len() {
if !is_less(&v[i], &*tmp) {
break;
}
ptr::copy_nonoverlapping(&v[i], &mut v[i - 1], 1);
hole.dest = &mut v[i];
let mut hole = InsertionHole { src: &*tmp, dest: v.get_unchecked_mut(end - 1) };
ptr::copy_nonoverlapping(hole.dest, v.get_unchecked_mut(end), 1);

let mut i = end - 1;
while i > 0 && is_less(&*tmp, v.get_unchecked(i - 1)) {
hole.dest = v.get_unchecked_mut(i - 1);
ptr::copy_nonoverlapping(hole.dest, v.get_unchecked_mut(i), 1);
i -= 1;
}
// `hole` gets dropped and thus copies `tmp` into the remaining hole in `v`.
}
Expand All @@ -953,7 +955,8 @@ where
/// # Safety
///
/// The two slices must be non-empty and `mid` must be in bounds. Buffer `buf` must be long enough
/// to hold a copy of the shorter slice. Also, `T` must not be a zero-sized type.
/// to hold a copy of the shorter slice.
#[allow(unused_unsafe)]
#[cfg(not(no_global_oom_handling))]
unsafe fn merge<T, F>(v: &mut [T], mid: usize, buf: *mut T, is_less: &mut F)
where
Expand Down Expand Up @@ -1034,12 +1037,14 @@ where
// Finally, `hole` gets dropped. If the shorter run was not fully consumed, whatever remains of
// it will now be copied into the hole in `v`.

#[inline(always)]
unsafe fn get_and_increment<T>(ptr: &mut *mut T) -> *mut T {
let old = *ptr;
*ptr = unsafe { ptr.offset(1) };
old
}

#[inline(always)]
unsafe fn decrement_and_get<T>(ptr: &mut *mut T) -> *mut T {
*ptr = unsafe { ptr.offset(-1) };
*ptr
Expand All @@ -1063,140 +1068,147 @@ where
}
}

/// This merge sort borrows some (but not all) ideas from TimSort, which is described in detail
/// [here](https://github.com/python/cpython/blob/main/Objects/listsort.txt).
/// This is a stable two stage merge sort with pre-sorted prefix optimization. The two stages are:
///
/// The algorithm identifies strictly descending and non-descending subsequences, which are called
/// natural runs. There is a stack of pending runs yet to be merged. Each newly found run is pushed
/// onto the stack, and then some pairs of adjacent runs are merged until these two invariants are
/// satisfied:
/// 1) Top-down recursive depth-first merge, which helps data locality
/// 2) Small slices sort using a fast insertion sort, then merge
///
/// 1. for every `i` in `1..runs.len()`: `runs[i - 1].len > runs[i].len`
/// 2. for every `i` in `2..runs.len()`: `runs[i - 2].len > runs[i - 1].len + runs[i].len`
///
/// The invariants ensure that the total running time is *O*(*n* \* log(*n*)) worst-case.
/// The total running time is *O*(*n* \* log(*n*)) worst-case.
#[cfg(not(no_global_oom_handling))]
fn merge_sort<T, F>(v: &mut [T], mut is_less: F)
where
F: FnMut(&T, &T) -> bool,
{
// `gt!` macro centralizes and clarifies the logic.
// Unchecked array access gives an approximate 20% performance improvement.
//
// # Safety
//
// `$left` and `$right` must be < `$v.len()`
macro_rules! gt {
($v: ident, $left: expr, $right: expr, $is_less: ident) => {
// $is_less(&$v[$right], &$v[$left])
$is_less(unsafe { &$v.get_unchecked($right) }, unsafe { &$v.get_unchecked($left) })
};
}

// Benchmarking determined these are the best sizes.
// Recursive merge switches to insertion sort / merge when slice length is <= SMALL_SLICE_LEN*2.
const SMALL_SLICE_LEN: usize = 10;
// Slices of up to this length get sorted using insertion sort.
const MAX_INSERTION: usize = 20;
// Very short runs are extended using insertion sort to span at least this many elements.
const MIN_RUN: usize = 10;

// Sorting has no meaningful behavior on zero-sized types.
if size_of::<T>() == 0 {
return;
}

let len = v.len();

// Short arrays get sorted in-place via insertion sort to avoid allocations.
if len <= MAX_INSERTION {
if len >= 2 {
for i in (0..len - 1).rev() {
insert_head(&mut v[i..], &mut is_less);
}
for i in 1..len {
insert_end(&mut v[..=i], &mut is_less);
}
return;
}

// Allocate a buffer to use as scratch memory. We keep the length 0 so we can keep in it
// shallow copies of the contents of `v` without risking the dtors running on copies if
// `is_less` panics. When merging two sorted runs, this buffer holds a copy of the shorter run,
// which will always have length at most `len / 2`.
let mut buf = Vec::with_capacity(len / 2);

// In order to identify natural runs in `v`, we traverse it backwards. That might seem like a
// strange decision, but consider the fact that merges more often go in the opposite direction
// (forwards). According to benchmarks, merging forwards is slightly faster than merging
// backwards. To conclude, identifying runs by traversing backwards improves performance.
let mut runs = vec![];
let mut end = len;
while end > 0 {
// Find the next natural run, and reverse it if it's strictly descending.
let mut start = end - 1;
if start > 0 {
start -= 1;
unsafe {
if is_less(v.get_unchecked(start + 1), v.get_unchecked(start)) {
while start > 0 && is_less(v.get_unchecked(start), v.get_unchecked(start - 1)) {
start -= 1;
// `is_less` panics. When merging two slices, this buffer holds a copy of the right-hand slice,
// which will always have length at most `(len + 1) / 2`.
let mut buf = Vec::with_capacity((len + 1) / 2);
slice_merge_sort(v, 0, buf.as_mut_ptr(), &mut is_less);

// Do a recursive depth-first merge while slice's length is greater than SMALL_SLICE_LEN*2.
// Below that length use a combination of insertion sort and merging.
// For optimization, `sorted` tracks how much of the slice's prefix is already sorted.
fn slice_merge_sort<T, F>(v: &mut [T], mut sorted: usize, buf_ptr: *mut T, is_less: &mut F)
where
F: FnMut(&T, &T) -> bool,
{
let len = v.len();
// find length of sorted prefix
if sorted == 0 {
sorted = if len <= 1 {
len
} else if gt!(v, 0, 1, is_less) {
// strictly descending
let mut i = 2;
while i < len && gt!(v, i - 1, i, is_less) {
i += 1;
}
// Reverse the slice so we don't have to sort it later.
v[..i].reverse();
i
} else {
// ascending
let mut i = 2;
while i < len && !gt!(v, i - 1, i, is_less) {
i += 1;
}
i
};
}

// Do merge sort, using `sorted` to avoid redundant sorting.
if sorted < len {
if len <= SMALL_SLICE_LEN + 2 {
for i in sorted..len {
insert_end(&mut v[..=i], is_less);
}
} else {
let mid;
if len > SMALL_SLICE_LEN * 2 {
mid = sorted.max(len / 2);
if sorted < mid {
slice_merge_sort(&mut v[..mid], sorted, buf_ptr, is_less);
}
slice_merge_sort(&mut v[mid..], 0, buf_ptr, is_less);
if !gt!(v, mid - 1, mid, is_less) {
return;
} else if gt!(v, 0, len - 1, is_less) {
// strictly reverse sorted
unsafe {
swap_slices(v, mid, buf_ptr);
}
return;
}
v[start..end].reverse();
} else {
while start > 0 && !is_less(v.get_unchecked(start), v.get_unchecked(start - 1))
{
start -= 1;
for i in sorted..SMALL_SLICE_LEN {
insert_end(&mut v[..=i], is_less);
}
for i in SMALL_SLICE_LEN + 1..len {
insert_end(&mut v[SMALL_SLICE_LEN..=i], is_less);
}
if !gt!(v, SMALL_SLICE_LEN - 1, SMALL_SLICE_LEN, is_less) {
return;
}
mid = SMALL_SLICE_LEN;
}
unsafe {
merge(v, mid, buf_ptr, is_less);
}
}
}

// Insert some more elements into the run if it's too short. Insertion sort is faster than
// merge sort on short sequences, so this significantly improves performance.
while start > 0 && end - start < MIN_RUN {
start -= 1;
insert_head(&mut v[start..end], &mut is_less);
}

// Push this run onto the stack.
runs.push(Run { start, len: end - start });
end = start;

// Merge some pairs of adjacent runs to satisfy the invariants.
while let Some(r) = collapse(&runs) {
let left = runs[r + 1];
let right = runs[r];
unsafe {
merge(
&mut v[left.start..right.start + right.len],
left.len,
buf.as_mut_ptr(),
&mut is_less,
);
}
runs[r] = Run { start: left.start, len: left.len + right.len };
runs.remove(r + 1);
}
}

// Finally, exactly one run must remain in the stack.
debug_assert!(runs.len() == 1 && runs[0].start == 0 && runs[0].len == len);

// Examines the stack of runs and identifies the next pair of runs to merge. More specifically,
// if `Some(r)` is returned, that means `runs[r]` and `runs[r + 1]` must be merged next. If the
// algorithm should continue building a new run instead, `None` is returned.
//
// TimSort is infamous for its buggy implementations, as described here:
// http://envisage-project.eu/timsort-specification-and-verification/
//
// The gist of the story is: we must enforce the invariants on the top four runs on the stack.
// Enforcing them on just top three is not sufficient to ensure that the invariants will still
// hold for *all* runs in the stack.
//
// This function correctly checks invariants for the top four runs. Additionally, if the top
// run starts at index 0, it will always demand a merge operation until the stack is fully
// collapsed, in order to complete the sort.
#[inline]
fn collapse(runs: &[Run]) -> Option<usize> {
let n = runs.len();
if n >= 2
&& (runs[n - 1].start == 0
|| runs[n - 2].len <= runs[n - 1].len
|| (n >= 3 && runs[n - 3].len <= runs[n - 2].len + runs[n - 1].len)
|| (n >= 4 && runs[n - 4].len <= runs[n - 3].len + runs[n - 2].len))
{
if n >= 3 && runs[n - 3].len < runs[n - 1].len { Some(n - 3) } else { Some(n - 2) }
} else {
None
/// swap contents of left-hand and right-hand slices divided at `mid`
///
/// # Safety
///
/// `buf_ptr` must point to a slice of `buf` which is long enough to hold `v[mid..]`
/// `v[mid..].len() <= buf.len()` because buf is sized as `(v.len() + 1) / 2' and `mid == sorted.max(v.len() / 2)`
///
/// `mid` must be <= `v.len()`
/// `mid <= v.len()` because `swap_slices` is only called when: `sorted < len && mid == sorted.max(len / 2)`
#[allow(unused_unsafe)]
unsafe fn swap_slices<T>(v: &mut [T], mid: usize, buf_ptr: *mut T) {
let rlen = v.len() - mid;
let v_ptr = v.as_mut_ptr();
unsafe {
ptr::copy_nonoverlapping(v_ptr.add(mid), buf_ptr, rlen);
ptr::copy(v_ptr, v_ptr.add(rlen), mid);
ptr::copy_nonoverlapping(buf_ptr, v_ptr, rlen);
}
}

#[derive(Clone, Copy)]
struct Run {
start: usize,
len: usize,
}
}