工作笔记

Arrays(一) 排序

2019-11-21
阅读次数:

“Arrays排序源码阅读 。”

自然排序

LegacyMergeSor

(1)全排序
源码如下:

    /**
     * Sorts the specified array of objects into ascending order, according
     * to the {@linkplain Comparable natural ordering} of its elements.
     * All elements in the array must implement the {@link Comparable}
     * interface.  Furthermore, all elements in the array must be
     * <i>mutually comparable</i> (that is, {@code e1.compareTo(e2)} must
     * not throw a {@code ClassCastException} for any elements {@code e1}
     * and {@code e2} in the array).
     *
     * <p>This sort is guaranteed to be <i>stable</i>:  equal elements will
     * not be reordered as a result of the sort.
     *
     * <p>Implementation note: This implementation is a stable, adaptive,
     * iterative mergesort that requires far fewer than n lg(n) comparisons
     * when the input array is partially sorted, while offering the
     * performance of a traditional mergesort when the input array is
     * randomly ordered.  If the input array is nearly sorted, the
     * implementation requires approximately n comparisons.  Temporary
     * storage requirements vary from a small constant for nearly sorted
     * input arrays to n/2 object references for randomly ordered input
     * arrays.
     *
     * <p>The implementation takes equal advantage of ascending and
     * descending order in its input array, and can take advantage of
     * ascending and descending order in different parts of the the same
     * input array.  It is well-suited to merging two or more sorted arrays:
     * simply concatenate the arrays and sort the resulting array.
     *
     * <p>The implementation was adapted from Tim Peters's list sort for Python
     * (<a href="http://svn.python.org/projects/python/trunk/Objects/listsort.txt">
     * TimSort</a>).  It uses techniques from Peter McIlroy's "Optimistic
     * Sorting and Information Theoretic Complexity", in Proceedings of the
     * Fourth Annual ACM-SIAM Symposium on Discrete Algorithms, pp 467-474,
     * January 1993.
     *
     * @param a the array to be sorted
     * @throws ClassCastException if the array contains elements that are not
     *         <i>mutually comparable</i> (for example, strings and integers)
     * @throws IllegalArgumentException (optional) if the natural
     *         ordering of the array elements is found to violate the
     *         {@link Comparable} contract
     */
    public static void sort(Object[] a) {
        if (LegacyMergeSort.userRequested)
            legacyMergeSort(a);
        else
            ComparableTimSort.sort(a, 0, a.length, null, 0, 0);
    }

  • 根据Comparable自然排序进行升序排序
  • 数组中所有元素必须实现Comparable接口
  • 数组中任何两个元素相互可比较的(必须不能抛出ClassCastException)
  • 稳定排序
  • 该实现是一种稳定的自适应的迭代归并排序:当数组中有部分元素是已排序的,比较次数远小于n lg(n);当数组是随机排序的,提供传统的归并排序性能;当数组几乎已排序,大概需要n次比较
  • 需要的临时存储从数组几乎已排序的小常数到数组随机排序的 n/2 对象引用
  • 该实现在输入数组中同等利用升序和降序,并且可以在同一输入数组不同部分利用升序和降序优势。非常适合合并两个或多个已排序的数组:简单低将多个数组连接起来,并对结果数组进行排序
  • 该实现改编自Tim Peters针对Python的列表排序。它使用了Peter McIlroy的“乐观排序和信息理论复杂性”技术,该技术在1993年1月举行的第四届ACM-SIAM离散算法年会上发表,第467-474页。

sort有两个分支,当LegacyMergeSort.userRequested为true的情况下,采用legacyMergeSort(未来版本中会去除),否则采用 ComparableTimSort。

当${java.util.Arrays.useLegacyMergeSort}变量为true时,采用userRequested为true的情况下,采用legacyMergeSort。 源码如下:

    /** To be removed in a future release. */
    private static void legacyMergeSort(Object[] a) {
        Object[] aux = a.clone();
        mergeSort(aux, a, 0, a.length, 0);
    }

文档注释中说明了该方法在将来的版本中会被移除;然后调用 mergeSort(Object[] src, Object[] dest, int low, int high, int off),传入的参数分别为aux, a, 0, a.length, 0。 源码如下:

/**
     * Src is the source array that starts at index 0
     * Dest is the (possibly larger) array destination with a possible offset
     * low is the index in dest to start sorting
     * high is the end index in dest to end sorting
     * off is the offset to generate corresponding low, high in src
     * To be removed in a future release.
     */
    @SuppressWarnings({"unchecked", "rawtypes"})
    private static void mergeSort(Object[] src,
                                  Object[] dest,
                                  int low,
                                  int high,
                                  int off) {
        int length = high - low;

        // Insertion sort on smallest arrays
        if (length < INSERTIONSORT_THRESHOLD) {
            for (int i=low; i<high; i++)
                for (int j=i; j>low &&
                         ((Comparable) dest[j-1]).compareTo(dest[j])>0; j--)
                    swap(dest, j, j-1);
            return;
        }

        // Recursively sort halves of dest into src
        int destLow  = low;
        int destHigh = high;
        low  += off;
        high += off;
        int mid = (low + high) >>> 1;
        mergeSort(dest, src, low, mid, -off);
        mergeSort(dest, src, mid, high, -off);

        // If list is already sorted, just copy from src to dest.  This is an
        // optimization that results in faster sorts for nearly ordered lists.
        if (((Comparable)src[mid-1]).compareTo(src[mid]) <= 0) {
            System.arraycopy(src, low, dest, destLow, length);
            return;
        }

        // Merge sorted halves (now in src) into dest
        for(int i = destLow, p = low, q = mid; i < destHigh; i++) {
            if (q >= high || p < mid && ((Comparable)src[p]).compareTo(src[q])<=0)
                dest[i] = src[p++];
            else
                dest[i] = src[q++];
        }
    }

注释说明了以下几点:

  • src是下标从零开始的源数组
  • dest是排序后的目标数组
  • low是需要排序的起始下标
  • high是需要排序的终点下标
  • off是生成对应src中下标的偏移量
  • 在未来的版本中会移除

归并排序就是将一组数分割成两个子数组,再对子数组进行排序,然后再归并起来。

int length = high - low;

获取待排序的元素个数


    /**
     * Tuning parameter: list size at or below which insertion sort will be
     * used in preference to mergesort.
     * To be removed in a future release.
     */
    private static final int INSERTIONSORT_THRESHOLD = 7;

 // Insertion sort on smallest arrays
        if (length < INSERTIONSORT_THRESHOLD) {
            for (int i=low; i<high; i++)
                for (int j=i; j>low &&
                         ((Comparable) dest[j-1]).compareTo(dest[j])>0; j--)
                    swap(dest, j, j-1);
            return;
        }

若待排序的元素个数小于INSERTIONSORT_THRESHOLD = 7 时,相比于归并排序优先使用插入排序。
若待排序的元素个数不小于INSERTIONSORT_THRESHOLD>=7,则不使用插入排序。继续往下执行代码:

 // Recursively sort halves of dest into src
        int destLow  = low;
        int destHigh = high;
        low  += off;
        high += off;
        int mid = (low + high) >>> 1;
        mergeSort(dest, src, low, mid, -off);
        mergeSort(dest, src, mid, high, -off);

        // If list is already sorted, just copy from src to dest.  This is an
        // optimization that results in faster sorts for nearly ordered lists.
        if (((Comparable)src[mid-1]).compareTo(src[mid]) <= 0) {
            System.arraycopy(src, low, dest, destLow, length);
            return;
        }

        // Merge sorted halves (now in src) into dest
        for(int i = destLow, p = low, q = mid; i < destHigh; i++) {
            if (q >= high || p < mid && ((Comparable)src[p]).compareTo(src[q])<=0)
                dest[i] = src[p++];
            else
                dest[i] = src[q++];
        }

这里就是归并排序了,将待排序的数组一分为二,利用递归,不断的切分分组,当切分后的数组元素个数小于7时,使用插入排序,然后再归并,即可完成排序功能。
在归并时,若列表已经排序,直接将src复制到dest,对于几乎排序好的列表会更快。源码如下:

// If list is already sorted, just copy from src to dest.  This is an
        // optimization that results in faster sorts for nearly ordered lists.
        if (((Comparable)src[mid-1]).compareTo(src[mid]) <= 0) {
            System.arraycopy(src, low, dest, destLow, length);
            return;
        }

(2)范围排序

    /**
     * Sorts the specified range of the specified array of objects into
     * ascending order, according to the
     * {@linkplain Comparable natural ordering} of its
     * elements.  The range to be sorted extends from index
     * {@code fromIndex}, inclusive, to index {@code toIndex}, exclusive.
     * (If {@code fromIndex==toIndex}, the range to be sorted is empty.)  All
     * elements in this range must implement the {@link Comparable}
     * interface.  Furthermore, all elements in this range must be <i>mutually
     * comparable</i> (that is, {@code e1.compareTo(e2)} must not throw a
     * {@code ClassCastException} for any elements {@code e1} and
     * {@code e2} in the array).
     *
     * <p>This sort is guaranteed to be <i>stable</i>:  equal elements will
     * not be reordered as a result of the sort.
     *
     * <p>Implementation note: This implementation is a stable, adaptive,
     * iterative mergesort that requires far fewer than n lg(n) comparisons
     * when the input array is partially sorted, while offering the
     * performance of a traditional mergesort when the input array is
     * randomly ordered.  If the input array is nearly sorted, the
     * implementation requires approximately n comparisons.  Temporary
     * storage requirements vary from a small constant for nearly sorted
     * input arrays to n/2 object references for randomly ordered input
     * arrays.
     *
     * <p>The implementation takes equal advantage of ascending and
     * descending order in its input array, and can take advantage of
     * ascending and descending order in different parts of the the same
     * input array.  It is well-suited to merging two or more sorted arrays:
     * simply concatenate the arrays and sort the resulting array.
     *
     * <p>The implementation was adapted from Tim Peters's list sort for Python
     * (<a href="http://svn.python.org/projects/python/trunk/Objects/listsort.txt">
     * TimSort</a>).  It uses techniques from Peter McIlroy's "Optimistic
     * Sorting and Information Theoretic Complexity", in Proceedings of the
     * Fourth Annual ACM-SIAM Symposium on Discrete Algorithms, pp 467-474,
     * January 1993.
     *
     * @param a the array to be sorted
     * @param fromIndex the index of the first element (inclusive) to be
     *        sorted
     * @param toIndex the index of the last element (exclusive) to be sorted
     * @throws IllegalArgumentException if {@code fromIndex > toIndex} or
     *         (optional) if the natural ordering of the array elements is
     *         found to violate the {@link Comparable} contract
     * @throws ArrayIndexOutOfBoundsException if {@code fromIndex < 0} or
     *         {@code toIndex > a.length}
     * @throws ClassCastException if the array contains elements that are
     *         not <i>mutually comparable</i> (for example, strings and
     *         integers).
     */
    public static void sort(Object[] a, int fromIndex, int toIndex) {
        rangeCheck(a.length, fromIndex, toIndex);
        if (LegacyMergeSort.userRequested)
            legacyMergeSort(a, fromIndex, toIndex);
        else
            ComparableTimSort.sort(a, fromIndex, toIndex, null, 0, 0);
    }

与数组全排序一致,不同点在于 (1) 增加了范围 fromIndex(包含在内) 到 toIndex(排除在外)。
首先范围检测,若参数有误,抛出异常,源码如下:

    /**
     * Checks that {@code fromIndex} and {@code toIndex} are in
     * the range and throws an exception if they aren't.
     */
    private static void rangeCheck(int arrayLength, int fromIndex, int toIndex) {
        if (fromIndex > toIndex) {
            throw new IllegalArgumentException(
                    "fromIndex(" + fromIndex + ") > toIndex(" + toIndex + ")");
        }
        if (fromIndex < 0) {
            throw new ArrayIndexOutOfBoundsException(fromIndex);
        }
        if (toIndex > arrayLength) {
            throw new ArrayIndexOutOfBoundsException(toIndex);
        }
    }

(2)legacyMergeSort参数增加,fromIndex,toIndex。 源码如下:

        /** To be removed in a future release. */
    private static void legacyMergeSort(Object[] a,
                                        int fromIndex, int toIndex) {
        Object[] aux = copyOfRange(a, fromIndex, toIndex);
        mergeSort(aux, a, fromIndex, toIndex, -fromIndex);
    }

范围复制,生成待排序的数组,源码如下:

    /**
     * Copies the specified range of the specified array into a new array.
     * The initial index of the range (<tt>from</tt>) must lie between zero
     * and <tt>original.length</tt>, inclusive.  The value at
     * <tt>original[from]</tt> is placed into the initial element of the copy
     * (unless <tt>from == original.length</tt> or <tt>from == to</tt>).
     * Values from subsequent elements in the original array are placed into
     * subsequent elements in the copy.  The final index of the range
     * (<tt>to</tt>), which must be greater than or equal to <tt>from</tt>,
     * may be greater than <tt>original.length</tt>, in which case
     * <tt>null</tt> is placed in all elements of the copy whose index is
     * greater than or equal to <tt>original.length - from</tt>.  The length
     * of the returned array will be <tt>to - from</tt>.
     * <p>
     * The resulting array is of exactly the same class as the original array.
     *
     * @param <T> the class of the objects in the array
     * @param original the array from which a range is to be copied
     * @param from the initial index of the range to be copied, inclusive
     * @param to the final index of the range to be copied, exclusive.
     *     (This index may lie outside the array.)
     * @return a new array containing the specified range from the original array,
     *     truncated or padded with nulls to obtain the required length
     * @throws ArrayIndexOutOfBoundsException if {@code from < 0}
     *     or {@code from > original.length}
     * @throws IllegalArgumentException if <tt>from &gt; to</tt>
     * @throws NullPointerException if <tt>original</tt> is null
     * @since 1.6
     */
    @SuppressWarnings("unchecked")
    public static <T> T[] copyOfRange(T[] original, int from, int to) {
        return copyOfRange(original, from, to, (Class<? extends T[]>) original.getClass());
    }


copyOfRange(U[] original, int from, int to, Class<? extends T[]> newType) 源码如下:

    /**
     * Copies the specified range of the specified array into a new array.
     * The initial index of the range (<tt>from</tt>) must lie between zero
     * and <tt>original.length</tt>, inclusive.  The value at
     * <tt>original[from]</tt> is placed into the initial element of the copy
     * (unless <tt>from == original.length</tt> or <tt>from == to</tt>).
     * Values from subsequent elements in the original array are placed into
     * subsequent elements in the copy.  The final index of the range
     * (<tt>to</tt>), which must be greater than or equal to <tt>from</tt>,
     * may be greater than <tt>original.length</tt>, in which case
     * <tt>null</tt> is placed in all elements of the copy whose index is
     * greater than or equal to <tt>original.length - from</tt>.  The length
     * of the returned array will be <tt>to - from</tt>.
     * The resulting array is of the class <tt>newType</tt>.
     *
     * @param <U> the class of the objects in the original array
     * @param <T> the class of the objects in the returned array
     * @param original the array from which a range is to be copied
     * @param from the initial index of the range to be copied, inclusive
     * @param to the final index of the range to be copied, exclusive.
     *     (This index may lie outside the array.)
     * @param newType the class of the copy to be returned
     * @return a new array containing the specified range from the original array,
     *     truncated or padded with nulls to obtain the required length
     * @throws ArrayIndexOutOfBoundsException if {@code from < 0}
     *     or {@code from > original.length}
     * @throws IllegalArgumentException if <tt>from &gt; to</tt>
     * @throws NullPointerException if <tt>original</tt> is null
     * @throws ArrayStoreException if an element copied from
     *     <tt>original</tt> is not of a runtime type that can be stored in
     *     an array of class <tt>newType</tt>.
     * @since 1.6
     */
    public static <T,U> T[] copyOfRange(U[] original, int from, int to, Class<? extends T[]> newType) {
        int newLength = to - from;
        if (newLength < 0)
            throw new IllegalArgumentException(from + " > " + to);
        @SuppressWarnings("unchecked")
        T[] copy = ((Object)newType == (Object)Object[].class)
            ? (T[]) new Object[newLength]
            : (T[]) Array.newInstance(newType.getComponentType(), newLength);
        System.arraycopy(original, from, copy, 0,
                         Math.min(original.length - from, newLength));
        return copy;
    }

ComparableTimSort

最初是由Tim Peters于2002年在Python语言中提出的。 TimSort 是一个归并排序做了大量优化的版本。对归并排序排在已经反向排好序的输入时表现O(n2)的特点做了特别优化。对已经正向排好序的输入减少回溯。对两种情况混合(一会升序,一会降序)的输入处理比较好。
ComparableTimSort源码如下:

/*
     * The next method (package private and static) constitutes the
     * entire API of this class.
     */

    /**
     * Sorts the given range, using the given workspace array slice
     * for temp storage when possible. This method is designed to be
     * invoked from public methods (in class Arrays) after performing
     * any necessary array bounds checks and expanding parameters into
     * the required forms.
     *
     * @param a the array to be sorted
     * @param lo the index of the first element, inclusive, to be sorted
     * @param hi the index of the last element, exclusive, to be sorted
     * @param work a workspace array (slice)
     * @param workBase origin of usable space in work array
     * @param workLen usable size of work array
     * @since 1.8
     */
    static void sort(Object[] a, int lo, int hi, Object[] work, int workBase, int workLen) {
        assert a != null && lo >= 0 && lo <= hi && hi <= a.length;

        int nRemaining  = hi - lo;
        if (nRemaining < 2)
            return;  // Arrays of size 0 and 1 are always sorted

        // If array is small, do a "mini-TimSort" with no merges
        if (nRemaining < MIN_MERGE) {
            int initRunLen = countRunAndMakeAscending(a, lo, hi);
            binarySort(a, lo, hi, lo + initRunLen);
            return;
        }

        /**
         * March over the array once, left to right, finding natural runs,
         * extending short natural runs to minRun elements, and merging runs
         * to maintain stack invariant.
         */
        ComparableTimSort ts = new ComparableTimSort(a, work, workBase, workLen);
        int minRun = minRunLength(nRemaining);
        do {
            // Identify next run
            int runLen = countRunAndMakeAscending(a, lo, hi);

            // If run is short, extend to min(minRun, nRemaining)
            if (runLen < minRun) {
                int force = nRemaining <= minRun ? nRemaining : minRun;
                binarySort(a, lo, lo + force, lo + runLen);
                runLen = force;
            }

            // Push run onto pending-run stack, and maybe merge
            ts.pushRun(lo, runLen);
            ts.mergeCollapse();

            // Advance to find next run
            lo += runLen;
            nRemaining -= runLen;
        } while (nRemaining != 0);

        // Merge all remaining runs to complete sort
        assert lo == hi;
        ts.mergeForceCollapse();
        assert ts.stackSize == 1;
    }

首先解释一下该方法的注释:

  • 这个包私有且静态的方法构成了该类的整个 API
  • 在可能的情况下,使用给定的工作空间数组切片当作临时存储空间进行排序
  • 该方法设计在Arrays类中共有调用(在进行一些列的边界检查和参数扩展后)
  • a 待排序的数组
  • lo 待排序数组中第一个元素索引(包含该索引)
  • hi 待排序数组中最后一个元素索引(不包含该索引)
  • work 工作空间数组(切片)
  • workBase 工作数组中可用空间的起源
  • workLen 工作数组可用大小

来一步步分析:

    int nRemaining  = hi - lo;
    if (nRemaining < 2)
        return;  // Arrays of size 0 and 1 are always sorted

获取待排序数组元素个数,若小于2,则无需排序。


    /**
     * This is the minimum sized sequence that will be merged.  Shorter
     * sequences will be lengthened by calling binarySort.  If the entire
     * array is less than this length, no merges will be performed.
     *
     * This constant should be a power of two.  It was 64 in Tim Peter's C
     * implementation, but 32 was empirically determined to work better in
     * this implementation.  In the unlikely event that you set this constant
     * to be a number that's not a power of two, you'll need to change the
     * {@link #minRunLength} computation.
     *
     * If you decrease this constant, you must change the stackLen
     * computation in the TimSort constructor, or you risk an
     * ArrayOutOfBounds exception.  See listsort.txt for a discussion
     * of the minimum stack length required as a function of the length
     * of the array being sorted and the minimum merge sequence length.
     */
    private static final int MIN_MERGE = 32;

    // If array is small, do a "mini-TimSort" with no merges
    if (nRemaining < MIN_MERGE) {
        int initRunLen = countRunAndMakeAscending(a, lo, hi);
        binarySort(a, lo, hi, lo + initRunLen);
        return;
    }

传入的待排序数组若小于阈值MIN_MERGE(Java实现中为32,Python实现中为64),则调用 binarySort,这是一个不包含合并操作的 mini-TimSort。
a) 从数组开始处找到一组升序或降序(找到后翻转)的子数组,返回子数组最后一个元素的索引 b) Binary Sort:使用二分查找的方法将后续的数插入之前的已排序数组,binarySort 对数组 a[lo:hi] 进行排序,并且a[lo:start] 是已经排好序的。算法的思路是对a[start:hi] 中的元素,每次使用binarySearch 为它在 a[lo:start] 中找到相应位置,并插入。
我们来看看 countRunAndMakeAscending 是如何从数组开始处找到一组升序或降序(找到后翻转)的子数组,并且返回子数组最后一个元素的索引。
源码如下:

    /**
     * Returns the length of the run beginning at the specified position in
     * the specified array and reverses the run if it is descending (ensuring
     * that the run will always be ascending when the method returns).
     *
     * A run is the longest ascending sequence with:
     *
     *    a[lo] <= a[lo + 1] <= a[lo + 2] <= ...
     *
     * or the longest descending sequence with:
     *
     *    a[lo] >  a[lo + 1] >  a[lo + 2] >  ...
     *
     * For its intended use in a stable mergesort, the strictness of the
     * definition of "descending" is needed so that the call can safely
     * reverse a descending sequence without violating stability.
     *
     * @param a the array in which a run is to be counted and possibly reversed
     * @param lo index of the first element in the run
     * @param hi index after the last element that may be contained in the run.
              It is required that {@code lo < hi}.
     * @return  the length of the run beginning at the specified position in
     *          the specified array
     */
    @SuppressWarnings({"unchecked", "rawtypes"})
    private static int countRunAndMakeAscending(Object[] a, int lo, int hi) {
        assert lo < hi;
        int runHi = lo + 1;
        if (runHi == hi)
            return 1;

        // Find end of run, and reverse range if descending
        if (((Comparable) a[runHi++]).compareTo(a[lo]) < 0) { // Descending
            while (runHi < hi && ((Comparable) a[runHi]).compareTo(a[runHi - 1]) < 0)
                runHi++;
            reverseRange(a, lo, runHi);
        } else {                              // Ascending
            while (runHi < hi && ((Comparable) a[runHi]).compareTo(a[runHi - 1]) >= 0)
                runHi++;
        }

        return runHi - lo;
    }

countRunAndMakeAscending方法接收的参数有三个,待查找的数组,起始索引,终点索引。 基本思想是:判断第二个数和第一个数的大小来确定是升序还是降序,若第二个数小于第一个数,则为降序,然后在while循环中,若后后面的数依旧小于前面的数,则runHi++计数,直到不满足降序;然后调用reverseRange进行反转,变成升序。若第二个数大于第一个数,则为升序,然后在while循环中,若后面的数依旧不小于前面的数,则runHi++计数,直到不满足升序。 返回runHi - lo也就是满足升序或者降序的个数。且这个数组是从第一个开始的。最后都是升序数组。
所以在执行binarySort方法的时候只需要将lo + initRunLen后的数依此插入前面的升序序列中即可。

若待排序数组个数不小于MIN_MERGE时,直接进行排序。
源码如下:

/**
         * March over the array once, left to right, finding natural runs,
         * extending short natural runs to minRun elements, and merging runs
         * to maintain stack invariant.
         */
        ComparableTimSort ts = new ComparableTimSort(a, work, workBase, workLen);
        int minRun = minRunLength(nRemaining);
        do {
            // Identify next run
            int runLen = countRunAndMakeAscending(a, lo, hi);

            // If run is short, extend to min(minRun, nRemaining)
            if (runLen < minRun) {
                int force = nRemaining <= minRun ? nRemaining : minRun;
                binarySort(a, lo, lo + force, lo + runLen);
                runLen = force;
            }

            // Push run onto pending-run stack, and maybe merge
            ts.pushRun(lo, runLen);
            ts.mergeCollapse();

            // Advance to find next run
            lo += runLen;
            nRemaining -= runLen;
        } while (nRemaining != 0);

        // Merge all remaining runs to complete sort
        assert lo == hi;
        ts.mergeForceCollapse();
        assert ts.stackSize == 1;

来一步步分析
计算最小合并序列长度minRun,根据待排序元素的个数nRemaining,计算出一个minRun(后面会详细讲解计算细节)。

int minRun = minRunLength(nRemaining);

接着来分析dowhile循环中的代码:
(1)找出升序:找出一个升序子数组,或者一个经过反转的降序子数组。子数组个数为runLen。
源码如下:

    // Identify next run
    int runLen = countRunAndMakeAscending(a, lo, hi);

    private static int countRunAndMakeAscending(Object[] a, int lo, int hi) {
        assert lo < hi;
        int runHi = lo + 1;
        if (runHi == hi)
            return 1;

        // Find end of run, and reverse range if descending
        if (((Comparable) a[runHi++]).compareTo(a[lo]) < 0) { // Descending
            while (runHi < hi && ((Comparable) a[runHi]).compareTo(a[runHi - 1]) < 0)
                runHi++;
            reverseRange(a, lo, runHi);
        } else {                              // Ascending
            while (runHi < hi && ((Comparable) a[runHi]).compareTo(a[runHi - 1]) >= 0)
                runHi++;
        }

        return runHi - lo;
    }

(2)构造合并序列run:比较runLen与minRun的长度,如果:
a. runLen < minRun, 用二分插入排序,把原集合中runLen后面的元素补足到min(minRun,nRemaining)个元素。构造出一个集合命名为run,并且runLen= min(minRun,nRemaining)。 b. runLen >= minRun,run取runLen个元素。
源码如下:

    // If run is short, extend to min(minRun, nRemaining)
    if (runLen < minRun) {
        int force = nRemaining <= minRun ? nRemaining : minRun;
        binarySort(a, lo, lo + force, lo + runLen);
        runLen = force;
    }

(3)压栈等待合并:将步骤3中,构建的run压入名runBase的栈中。
源码如下:

    // Push run onto pending-run stack, and maybe merge
    ts.pushRun(lo, runLen);

    /**
     * A stack of pending runs yet to be merged.  Run i starts at
     * address base[i] and extends for len[i] elements.  It's always
     * true (so long as the indices are in bounds) that:
     *
     *     runBase[i] + runLen[i] == runBase[i + 1]
     *
     * so we could cut the storage for this, but it's a minor amount,
     * and keeping all the info explicit simplifies the code.
     */
    private int stackSize = 0;  // Number of pending runs on stack

    /**
     * Pushes the specified run onto the pending-run stack.
     *
     * @param runBase index of the first element in the run
     * @param runLen  the number of elements in the run
     */
    private void pushRun(int runBase, int runLen) {
        this.runBase[stackSize] = runBase;
        this.runLen[stackSize] = runLen;
        stackSize++;
    }

(4)构造runBase的阶梯结构:将runBase中的run进行合并,根据run中的元素个数栈构造出上小下大的阶梯结构。这个阶梯要求:阶梯内任意3级从下向上的run的元素个数(run0, run1, run2)满足run0>run1+run2 && run1>run2。不满足条件的run就会被合并,合并的原则如是:每一次有新的run被压入栈,那么我们比较栈顶3个run的元素个数(栈顶的run是新压入栈的run)。

a. 如果run0<=run1+run2,那么run1与run0和run2中元素较少的那个run合并。
b. 如果run0>run1+run2,但run1<=run2,那么run1与run2合并。
c. 重复步骤a和b,直到满足条件run0>run1+run2 && run1>run2或者baseRun中不足3个run。

(5)重复步骤(1)-(4),直到所有的元素都被压入栈中(nRemaining==0)。 (6)栈合并:将稳定结构的栈runBase进行合并,直到所有的run都被合并,合并后的run就是最后的排序结果。当然在合并的过程中,通常情况下用栈顶的run与下一个run进行合并,比如有5个润,run4与run3合并。如果出现了run4>run2的情况,那么我们合并run3与run2(合并的原则是合并相邻两个短的run)。

疑问:第4步构造runBase的时候,我有个疑问。为什么这个阶梯一定要满足run0>run1+run2&& run1>run2这样的条件呢?
模拟MergeSort的合并的部分。用run0>run1+run2&& run1>run2这条件构造出与MergeSort类似的楼梯结构。

如何计算minRun:首先你要知道minRun的长度是动态的,他是根据每次nRemaining计算出来的。计算方式如下:

a. 如果nRemaining<32那么,minRun就是nRemaining。
b. 如果nRemaining>=32,那么minRun的值会在16到32之间。为了实现这一点,我们对nRemaining不断的除以2,直到得到一个k,16<=k<=32。

  • 如果除以2的过程中,出现过不能整除的情况,那么最终minRun=k+1。

  • 如果除以2的过程中,一直能够整除,那么最终minRun=k。

这里有一个很简单的例子:分别是nRemaining为100和102的例子。

nRemaining=100:不断除以2;100->50->25;k=25;16<=25<=32;除以2过程中都能除尽,k=25。

nRemaining=102:不断除以2;102->51->25;k=25;16<=25<=32;51/2不能整除。minRun=k+1=26。

计算minRun是为了避免run过小,合并过于频繁的情况。

还有一个优化放在了两个集合之间的合并上

  1. 我们现在有两个进行合并的栈,我们从上到下把他们称为R2和R1。R2和R1已经是有序的集合,记住这一点,这很重要。

  2. 找出R2中第一个元素在R1中的位置。在这个位置之前的R1中的元素,不需要参加比较(图1 R1中灰色部分),因为他们一定比R2中任意元素小。

  3. 找出R1中最后一个元素在R2中的位置。在这个位置之后的R2中的元素也不需要参与比较(图1 R2中灰色部分),因为他们一定比R1中的任意元素大。

  4. 划分完成后,蓝色部分用MergeSort方式合并。在合并过程中,发现R1连续7次比R2小(或者大),那么我们再次重复步骤2和3,划分出新蓝色部分。直到不满足连续7次小(或者大),则重新采用MergeSort的方式进行合并。

合并

图1

timsort是工业级的优化。增加了排序的自适应性,避免排序向MergeSort的最坏时间复杂度nlog2n靠拢。

  1. 他们能识别降序进行翻转,获取待比较序列中连续的升序列。

  2. 动态计算minRun以减少合并次数。

  3. 把要合并的run模拟压栈成mergeSort的样子。

  4. 自动排除一些不需要比较的头部和尾部。

  5. 在一个序列总比另外一个序列小的时候,他会猜测会有更多的数据满足这个情况,再次划出那些不用去比较的头部和尾部。

定制排序

定制排序逻辑上类似于自然排序,在列表中类未实现Comparable接口时,使用实现Comparator来实现定制排序。这里重点分析TimSort源码:


import java.util.Arrays;
import java.util.Comparator;

/**
 * Created by yxf on 16-5-30.
 * 这里对TimSort算法在java中的实现做了注释,部分实现逻辑相似的注释没有处理,直接是原来的注释。
 *
 */
class TimSort<T> {
    /**
     * 参与序列合并的最短长度。比这个更短的序列将会通过二叉插入排序加长。如果整个数组都比这个短,那就不会经过归并排序。
     * <p/>
     * 这个常量的值必须2的幂。Tim Perter 在C语言中的实现版本使用了64,但是根据经验这里的版本使用32更合适。在最坏的情况下,使用了非2的幂赋值,就必须要重写 {@link # minRunLength}这个方法。
     * 如果减小了这个值,就需要在构造方法中减小stackLen的值,不然将面临数组越界的风险。
     */
    private static final int MIN_MERGE = 32;

    /**
     * 将要被排序的数组
     */
    private final T[] a;

    /**
     * 这次排序的比较器
     */
    private final Comparator<? super T> c;

    /**
     * 判断数据顺序连续性的阈值
     * 后面结合代码看,会容易理解一点
     */
    private static final int MIN_GALLOP = 7;

    private int minGallop = MIN_GALLOP;

    /**
     * 归并排序中临时数组的最大长度,数组的长度也可以根据需求增长。
     * 与C语言中的实现方式不同,对于相对较小的数组,我们不用这么大的临时数组。这点改变对性能有显著的影响
     */
    private static final int INITIAL_TMP_STORAGE_LENGTH = 256;


    /**
     * 临时数组,根据泛型的内容可知,实际的存储要用Object[],不能用T[]
     */
    private T[] tmp;

    /**
     * 栈中待归并的run的数量。一个run i的范围从runBase[i]开始,一直延续到runLen[i]。
     * 下面这个根据前一个run的结尾总是下一个run的开头。
     * 所以下面的等式总是成立:
     * runBase[i] + runLen[i] == runBase[i+1];
     **/

    private int stackSize = 0; //栈中run的数量
    private final int[] runBase;
    private final int[] runLen;

    /**
     * 这个构造方法是私有的所以只能在类内部创建。
     * 创建这个实例是为了保存一次排序过程中的状态变量。
     */
    private TimSort(T[] a, Comparator<? super T> c) {
        this.a = a;
        this.c = c;

        // 这里是分配临时数组的空间。SuppressWainings是为了消除泛型数组转型的警告
        // 临时数组的长度写的很精炼,不明白的自己熟悉一下java位操作。
        // 结果就是 数组长度的一半或者是INITIAL_TMP_STORAGE_LENGTH
        int len = a.length;
        @SuppressWarnings({"unchecked", "UnnecessaryLocalVariable"})
        T[] newArray = (T[]) new Object[len < 2 * INITIAL_TMP_STORAGE_LENGTH ?
                len >>> 1 : INITIAL_TMP_STORAGE_LENGTH];
        tmp = newArray;

        /**
         * 这里是分配储存run的栈的空间,它不能在运行时扩展。
         * C语言版本中的栈一直使用固定值85,但这样对一些中小数组来说有些浪费资源。所以,
         * 这个版本我们使用了相对较小容量的栈。
         * 在MIN_MERGE减小的时候,这些‘魔法数’可能面临数组越界的风险。
         * */
        int stackLen = (len < 120 ? 5 :
                                 len < 1542 ? 10 :
                                 len < 119151 ? 24 : 40);
        runBase = new int[stackLen];
        runLen = new int[stackLen];
    }

    static <T> void sort(T[] a, Comparator<? super T> c) {
        sort(a, 0, a.length, c);
    }

    static <T> void sort(T[] a, int lo, int hi, Comparator<? super T> c) {
        if (c == null) {
            Arrays.sort(a, lo, hi);
            return;
        }

        rangeCheck(a.length, lo, hi);
        int nRemaining = hi - lo;
        if (nRemaining < 2)
            return;  // 长度是0或者1 就不需要排序了。

        // 小于MIN_MERGE长度的数组就不用归并排序了,杀鸡焉用宰牛刀
        if (nRemaining < MIN_MERGE) {
            int initRunLen = countRunAndMakeAscending(a, lo, hi, c);
            binarySort(a, lo, hi, lo + initRunLen, c);
            return;
        }

        /**
         * March over the array once, left to right, finding natural runs,
         * extending short natural runs to minRun elements, and merging runs
         * to maintain stack invariant.
         *
         * 下面将进入算法流程的主体,首先理解源码注释中run的含义,可以理解为升序序列的意思。
         *
         * 从左到右,遍历一边数组。找出自然排好序的序列(natural run),把短的自然升序序列通过二叉查找排序
         * 扩展到minRun长度的升序序列。最后合并栈中的所有升序序列,保证规则不变。
         */
        TimSort<T> ts = new TimSort<>(a, c); //新建TimSort对象,保存栈的状态
        int minRun = minRunLength(nRemaining);
        do {
            //跟二叉查找插入排序一样,先找自然升序序列
            int runLen = countRunAndMakeAscending(a, lo, hi, c);

            // If run is short, extend to min(minRun, nRemaining)
            // 如果 自然升序的长度不够minRun,就把 min(minRun,nRemaining)长度的范围内的数列排好序
            if (runLen < minRun) {
                int force = nRemaining <= minRun ? nRemaining : minRun;
                binarySort(a, lo, lo + force, lo + runLen, c);
                runLen = force;
            }

            // Push run onto pending-run stack, and maybe merge
            //把已经排好序的数列压入栈中,检查是不是需要合并
            ts.pushRun(lo, runLen);
            ts.mergeCollapse();

            //把指针后移runLen距离,准备开始下一轮片段的排序
            lo += runLen;
            //剩下待排序的数量相应的减少 runLen
            nRemaining -= runLen;
        } while (nRemaining != 0);

        // Merge all remaining runs to complete sort
        assert lo == hi;
        ts.mergeForceCollapse();
        assert ts.stackSize == 1;
    }

    /**
     * 被优化的二分插入排序
     *
     * 使用二分插入排序算法给指定一部分数组排序。这是给小数组排序的最佳方案。最差情况下
     * 它需要 O(n log n) 次比较和 O(n^2)次数据移动。
     *
     * 如果开始的部分数据是有序的那么我们可以利用它们。这个方法默认数组中的位置lo(包括在内)到
     * start(不包括在内)的范围内是已经排好序的。
     *
     * @param a     被排序的数组
     * @param lo    待排序范围内的首个元素的位置
     * @param hi    待排序范围内最后一个元素的后一个位置
     * @param start 待排序范围内的第一个没有排好序的位置,确保 (lo <= start <= hi)
     * @param c     本次排序的比较器
     */
    @SuppressWarnings("fallthrough")
    private static <T> void binarySort(T[] a, int lo, int hi, int start,
                                       Comparator<? super T> c) {
        assert lo <= start && start <= hi;
        //如果start 从起点开始,做下预处理;也就是原本就是无序的。
        if (start == lo)
            start++;
        //从start位置开始,对后面的所有元素排序
        for (; start < hi; start++) {
            //pivot 代表正在参与排序的值,
            T pivot = a[start];

            // Set left (and right) to the index where a[start] (pivot) belongs
            // 把pivot应当插入的设置的边界设置为left和right
            int left = lo;
            int right = start;
            assert left <= right;

            /*
             * 保证的逻辑:
             *   pivot >= all in [lo, left).
             *   pivot <  all in [right, start).
             */
            while (left < right) {
                int mid = (left + right) >>> 1;
                if (c.compare(pivot, a[mid]) < 0)
                    right = mid;
                else
                    left = mid + 1;
            }
            assert left == right;

            /**
             * 此时,仍然能保证:
             * pivot >= [lo, left) && pivot < [left,start)
             * 所以,pivot的值应当在left所在的位置,然后需要把[left,start)范围内的内容整体右移一位
             * 腾出空间。如果pivot与区间中的某个值相等,left指正会指向重复的值的后一位,
             * 所以这里的排序是稳定的。
             */
            int n = start - left;  //需要移动的范围的长度

            // switch语句是一条小优化,1-2个元素的移动就不需要System.arraycopy了。
            // (这代码写的真是简洁,switch原来可以这样用)
            switch (n) {
                case 2:
                    a[left + 2] = a[left + 1];
                case 1:
                    a[left + 1] = a[left];
                    break;
                default:
                    System.arraycopy(a, left, a, left + 1, n);
            }
            //移动过之后,把pivot的值放到应该插入的位置,就是left的位置了
            a[left] = pivot;
        }
    }

    /**
     * 这一段代码是TimSort算法中的一个小优化,它利用了数组中前面一段已有的顺序。
     * 如果是升序,直接返回统计结果;如果是降序,在返回之前,将这段数列倒置,
     * 以确保这断序列从首个位置到此位置的序列都是升序的。
     * 返回的结果是这种两种形式的,lo是这段序列的开始位置。
     *
     * A run is the longest ascending sequence with:
     *
     * a[lo] <= a[lo + 1] <= a[lo + 2] <= ...
     *
     * or the longest descending sequence with:
     *
     * a[lo] >  a[lo + 1] >  a[lo + 2] >  ...
     *
     * 为了保证排序的稳定性,这里要使用严格的降序,这样才能保证相等的元素不参与倒置子序列的过程,
     * 保证它们原本的顺序不被打乱。
     *
     * @param a  参与排序的数组
     * @param lo run中首个元素的位置
     * @param hi run中最后一个元素的后面一个位置,需要确保lo<hi
     * @param c  本次排序的比较器
     * @return 从首个元素开始的最长升序子序列的结尾位置+1 or 严格的降序子序列的结尾位置+1。
     */
    private static <T> int countRunAndMakeAscending(T[] a, int lo, int hi,
                                                    Comparator<? super T> c) {
        assert lo < hi;
        int runHi = lo + 1;
        if (runHi == hi)
            return 1;

        // 找出最长升序序的子序列,如果降序,倒置之
        if (c.compare(a[runHi++], a[lo]) < 0) { // 前两个元素是降序,就按照降序统计
            while (runHi < hi && c.compare(a[runHi], a[runHi - 1]) < 0)
                runHi++;
            reverseRange(a, lo, runHi);
        } else {                              // 前两个元素是升序,按照升序统计
            while (runHi < hi && c.compare(a[runHi], a[runHi - 1]) >= 0)
                runHi++;
        }

        return runHi - lo;
    }

    /**
     * 倒置数组中一段范围的元素
     *
     * @param a  指定数组
     * @param lo 这段范围的起始位置
     * @param hi 这段范围的终点位置的后一个位置
     */
    private static void reverseRange(Object[] a, int lo, int hi) {
        hi--;
        while (lo < hi) {
            Object t = a[lo];
            a[lo++] = a[hi];
            a[hi--] = t;
        }
    }

    /**
     * 返回参与合并的最小长度,如果自然排序的长度,小于此长度,那么就通过二分查找排序扩展到
     * 此长度。{@link #binarySort}.
     *
     * 粗略的讲,计算结果是这样的:
     *
     * 如果 n < MIN_MERGE, 直接返回 n。(太小了,不值得做复杂的操作);
     * 如果 n 正好是2的幂,返回 n / 2;
     * 其它情况下 返回一个数 k,满足 MIN_MERGE/2 <= k <= MIN_MERGE,
     * 这样结果就能保证 n/k 非常接近但小于一个2的幂。
     * 这个数字实际上是一种空间与时间的优化。
     *
     * @param n 参与排序的数组的长度
     * @return 参与归并的最短长度
     * 这段代码写得也很赞
     */
    private static int minRunLength(int n) {
        assert n >= 0;
        int r = 0;      // 只要不是 2的幂就会置 1
        while (n >= MIN_MERGE) {
            r |= (n & 1);
            n >>= 1;
        }
        return n + r;
    }

    /**
     * Pushes the specified run onto the pending-run stack.
     * 将指定的升序序列压入等待合并的栈中
     *
     * @param runBase 升序序列的首个元素的位置
     * @param runLen  升序序列的长度
     */
    private void pushRun(int runBase, int runLen) {
        this.runBase[stackSize] = runBase;
        this.runLen[stackSize] = runLen;
        stackSize++;
    }

    /**
     * 检查栈中待归并的升序序列,如果他们不满足下列条件就把相邻的两个序列合并,
     * 直到他们满足下面的条件
     *
     * 1. runLen[i - 3] > runLen[i - 2] + runLen[i - 1]
     * 2. runLen[i - 2] > runLen[i - 1]
     *
     * 每次添加新序列到栈中的时候都会执行一次这个操作。所以栈中的需要满足的条件
     * 需要靠调用这个方法来维护。
     *
     * 最差情况下,有点像玩2048。
     */
    private void mergeCollapse() {
        while (stackSize > 1) {
            int n = stackSize - 2;
            if (n > 0 && runLen[n - 1] <= runLen[n] + runLen[n + 1]) {
                if (runLen[n - 1] < runLen[n + 1])
                    n--;
                mergeAt(n);
            } else if (runLen[n] <= runLen[n + 1]) {
                mergeAt(n);
            } else {
                break; // Invariant is established
            }
        }
    }

    /**
     * 合并栈中所有待合并的序列,最后剩下一个序列。这个方法在整次排序中只执行一次
     */
    private void mergeForceCollapse() {
        while (stackSize > 1) {
            int n = stackSize - 2;
            if (n > 0 && runLen[n - 1] < runLen[n + 1])
                n--;
            mergeAt(n);
        }
    }

    /**
     * 在一个序列中,将一个指定的key,从左往右查找它应当插入的位置;如果序列中存在
     * 与key相同的值(一个或者多个),那返回这些值中最左边的位置。
     *
     * 推断: 统计概率的原因,随机数字来说,两个待合并的序列的尾假设是差不多大的,从尾开始
     * 做查找找到的概率高一些。仔细算一下,最差情况下,这种查找也是 log(n),所以这里没有
     * 用简单的二分查找。
     *
     * @param key  准备插入的key
     * @param a    参与排序的数组
     * @param base 序列范围的第一个元素的位置
     * @param len  整个范围的长度,一定有len > 0
     * @param hint 开始查找的位置,有0 <= hint <= len;越接近结果查找越快
     * @param c    排序,查找使用的比较器
     * @return 返回一个整数 k, 有 0 <= k <=n, 它满足 a[b + k - 1] < a[b + k]
     * 就是说key应当被放在 a[base + k],
     * 有 a[base,base+k) < key && key <=a [base + k, base + len)
     */
    private static <T> int gallopLeft(T key, T[] a, int base, int len, int hint,
                                      Comparator<? super T> c) {
        assert len > 0 && hint >= 0 && hint < len;
        int lastOfs = 0;
        int ofs = 1;
        if (c.compare(key, a[base + hint]) > 0) { // key > a[base+hint]
            // 遍历右边,直到 a[base+hint+lastOfs] < key <= a[base+hint+ofs]
            int maxOfs = len - hint;
            while (ofs < maxOfs && c.compare(key, a[base + hint + ofs]) > 0) {
                lastOfs = ofs;
                ofs = (ofs << 1) + 1;
                if (ofs <= 0)   // int overflow
                    ofs = maxOfs;
            }
            if (ofs > maxOfs)
                ofs = maxOfs;

            // 最终的ofs是这样确定的,满足条件 a[base+hint+lastOfs] < key <= a[base+hint+ofs]
            // 的一组
            // ofs:     1   3   7  15  31  63 2^n-1 ... maxOfs
            // lastOfs: 0   1   3   7  15  31 2^(n-1)-1  < ofs


            // 因为目前的offset是相对hint的,所以做相对变换
            lastOfs += hint;
            ofs += hint;
        } else { // key <= a[base + hint]
            // 遍历左边,直到[base+hint-ofs] < key <= a[base+hint-lastOfs]
            final int maxOfs = hint + 1;
            while (ofs < maxOfs && c.compare(key, a[base + hint - ofs]) <= 0) {
                lastOfs = ofs;
                ofs = (ofs << 1) + 1;
                if (ofs <= 0)   // int overflow
                    ofs = maxOfs;
            }
            if (ofs > maxOfs)
                ofs = maxOfs;
            // 确定ofs的过程与上面相同
            // ofs:     1   3   7  15  31  63 2^n-1 ... maxOfs
            // lastOfs: 0   1   3   7  15  31 2^(n-1)-1  < ofs

            // Make offsets relative to base
            int tmp = lastOfs;
            lastOfs = hint - ofs;
            ofs = hint - tmp;
        }
        assert -1 <= lastOfs && lastOfs < ofs && ofs <= len;

        /*
         * 现在的情况是 a[base+lastOfs] < key <= a[base+ofs], 所以,key应当在lastOfs的
         * 右边,又不超过ofs。在base+lastOfs-1到 base+ofs范围内做一次二叉查找。
         */
        lastOfs++;
        while (lastOfs < ofs) {
            int m = lastOfs + ((ofs - lastOfs) >>> 1);

            if (c.compare(key, a[base + m]) > 0)
                lastOfs = m + 1;  // a[base + m] < key
            else
                ofs = m;          // key <= a[base + m]
        }
        assert lastOfs == ofs;    // so a[base + ofs - 1] < key <= a[base + ofs]
        return ofs;
    }

    /**
     * 与gallopLeft相似,不同的是如果发现key的值与某些元素相等,那返回这些值最后一个元素的位置的
     * 后一个位置
     *
     * @param key  需要查找待插入位置的那个值
     * @param a    待排序的数组
     * @param base 被查找的序列中第一个元素的位置
     * @param len  被查找的序列的长度
     * @param hint 开始查找的位置,0 <= hint < len.它越接近结果所在位置,查找越快。
     * @param c    本次排序的比较器
     * @return 一个整数 k,  满足0 <= k <= n 并且 a[b + k - 1] <= key < a[b + k]
     */
    private static <T> int gallopRight(T key, T[] a, int base, int len,
                                       int hint, Comparator<? super T> c) {
        assert len > 0 && hint >= 0 && hint < len;

        int ofs = 1;
        int lastOfs = 0;
        if (c.compare(key, a[base + hint]) < 0) {
            // Gallop left until a[b+hint - ofs] <= key < a[b+hint - lastOfs]
            int maxOfs = hint + 1;
            while (ofs < maxOfs && c.compare(key, a[base + hint - ofs]) < 0) {
                lastOfs = ofs;
                ofs = (ofs << 1) + 1;
                if (ofs <= 0)   // int overflow
                    ofs = maxOfs;
            }
            if (ofs > maxOfs)
                ofs = maxOfs;

            // Make offsets relative to b
            int tmp = lastOfs;
            lastOfs = hint - ofs;
            ofs = hint - tmp;
        } else { // a[b + hint] <= key
            // Gallop right until a[b+hint + lastOfs] <= key < a[b+hint + ofs]
            int maxOfs = len - hint;
            while (ofs < maxOfs && c.compare(key, a[base + hint + ofs]) >= 0) {
                lastOfs = ofs;
                ofs = (ofs << 1) + 1;
                if (ofs <= 0)   // int overflow
                    ofs = maxOfs;
            }
            if (ofs > maxOfs)
                ofs = maxOfs;

            // Make offsets relative to b
            lastOfs += hint;
            ofs += hint;
        }
        assert -1 <= lastOfs && lastOfs < ofs && ofs <= len;

        /*
         * Now a[b + lastOfs] <= key < a[b + ofs], so key belongs somewhere to
         * the right of lastOfs but no farther right than ofs.  Do a binary
         * search, with invariant a[b + lastOfs - 1] <= key < a[b + ofs].
         */
        lastOfs++;
        while (lastOfs < ofs) {
            int m = lastOfs + ((ofs - lastOfs) >>> 1);

            if (c.compare(key, a[base + m]) < 0)
                ofs = m;          // key < a[b + m]
            else
                lastOfs = m + 1;  // a[b + m] <= key
        }
        assert lastOfs == ofs;    // so a[b + ofs - 1] <= key < a[b + ofs]
        return ofs;
    }

    /**
     * 合并在栈中位于i和i+1的两个相邻的升序序列。 i必须为从栈顶数,第二和第三个元素。
     * 换句话说i == stackSize - 2 || i == stackSize - 3
     *
     * @param i 待合并的第一个序列所在的位置
     */
    private void mergeAt(int i) {
        //校验
        assert stackSize >= 2;
        assert i >= 0;
        assert i == stackSize - 2 || i == stackSize - 3;
        //内部初始化
        int base1 = runBase[i];
        int len1 = runLen[i];
        int base2 = runBase[i + 1];
        int len2 = runLen[i + 1];
        assert len1 > 0 && len2 > 0;
        assert base1 + len1 == base2;

        /*
         * 记录合并后的序列的长度;如果i == stackSize - 3 就把最后一个序列的信息
         * 往前移一位,因为本次合并不关它的事。i+1对应的序列被合并到i序列中了,所以
         * i+1 数列可以消失了
         */
        runLen[i] = len1 + len2;
        if (i == stackSize - 3) {
            runBase[i + 1] = runBase[i + 2];
            runLen[i + 1] = runLen[i + 2];
        }
        //i+1消失了,所以长度也减下来了
        stackSize--;

        /*
         * 找出第二个序列的首个元素可以插入到第一个序列的什么位置,因为在此位置之前的序列已经就位了。
         * 它们可以被忽略,不参加归并。
         */
        int k = gallopRight(a[base2], a, base1, len1, 0, c);
        assert k >= 0;
        // 因为要忽略前半部分元素,所以起点和长度相应的变化
        base1 += k;
        len1 -= k;
        // 如果序列2 的首个元素要插入到序列1的后面,那就直接结束了,
        // !!! 因为序列2在数组中的位置本来就在序列1后面,也就是整个范围本来就是有序的!!!
        if (len1 == 0)
            return;

        /*
         * 跟上面相似,看序列1的最后一个元素(a[base1+len1-1])可以插入到序列2的什么位置(相对第二个序列起点的位置,非在数组中的位置),
         * 这个位置后面的元素也是不需要参与归并的。所以len2直接设置到这里,后面的元素直接忽略。
         */
        len2 = gallopLeft(a[base1 + len1 - 1], a, base2, len2, len2 - 1, c);
        assert len2 >= 0;
        if (len2 == 0)
            return;

        // 合并剩下的两个有序序列,并且这里为了节省空间,临时数组选用 min(len1,len2)的长度
		// 选择从低位开始还是高位开始比较是为了避免元素覆盖	
        // 优化的很细呢
        if (len1 <= len2)
            mergeLo(base1, len1, base2, len2);
        else
            mergeHi(base1, len1, base2, len2);
    }

    /**
     * 使用固定空间合并两个相邻的有序序列,保持数组的稳定性。
     * 使用本方法之前保证第一个序列的首个元素大于第二个序列的首个元素;第一个序列的末尾元素
     * 大于第二个序列的所有元素
     *
     * 为了性能,这个方法在len1 <= len2的时候调用;它的姐妹方法mergeHi应该在len1 >= len2
     * 的时候调用。len1==len2的时候随便调用哪个都可以
     *
     * @param base1 index of first element in first run to be merged
     * @param len1  length of first run to be merged (must be > 0)
     * @param base2 index of first element in second run to be merged
     *              (must be aBase + aLen)
     * @param len2  length of second run to be merged (must be > 0)
     */
    private void mergeLo(int base1, int len1, int base2, int len2) {
        assert len1 > 0 && len2 > 0 && base1 + len1 == base2;

        //将第一个序列放到临时数组中
        T[] a = this.a; // For performance
        T[] tmp = ensureCapacity(len1);
        System.arraycopy(a, base1, tmp, 0, len1);

        int cursor1 = 0;       // 临时数组指针
        int cursor2 = base2;   // 序列2的指针,参与归并的另一个序列
        int dest = base1;      // 保存结果的指针

        // 这里先把第二个序列的首个元素,移动到结果序列中的位置,然后处理那些不需要归并的情况
        a[dest++] = a[cursor2++];

        // 序列2只有一个元素的情况,把它移动到指定位置之后,剩下的临时数组
        // 中的所有序列1的元素全部copy到后面
        if (--len2 == 0) {
            System.arraycopy(tmp, cursor1, a, dest, len1);
            return;
        }
        // 序列1只有一个元素的情况,把它移动到最后一个位置,为了不覆盖,先把序列2中的元素
        // 全部移走。这个是因为序列1中的最后一个元素比序列2中的所有元素都大,这是该方法执行的条件
        if (len1 == 1) {
            System.arraycopy(a, cursor2, a, dest, len2);
            a[dest + len2] = tmp[cursor1]; // Last elt of run 1 to end of merge
            return;
        }

        Comparator<? super T> c = this.c;  // 本次排序的比较器

        int minGallop = this.minGallop;    //  "    "       "     "      "

        // 不了解break标签的同学要补补Java基本功了
        outer:
        while (true) {
            /*
            * 这里加了两个值来记录一个序列连续比另外一个大的次数,根据此信息,可以做出一些
            * 优化
            * */
            int count1 = 0; // 序列1 连续 比序列2大多少次
            int count2 = 0; // 序列2 连续 比序列1大多少次

            /*
            * 这里是直接的归并算法的合并的部分,这里会统计count1合count2,
            * 如果其中一个大于一个阈值,就会跳出循环
            * */
            do {
                assert len1 > 1 && len2 > 0;
                if (c.compare(a[cursor2], tmp[cursor1]) < 0) {
                    a[dest++] = a[cursor2++];
                    count2++;
                    count1 = 0;

                    // 序列2没有元素了就跳出整次合并
                    if (--len2 == 0)
                        break outer;
                } else {
                    a[dest++] = tmp[cursor1++];
                    count1++;
                    count2 = 0;
                    // 如果序列1只剩下最后一个元素了就可以跳出循环
                    if (--len1 == 1)
                        break outer;
                }

            /*
            * 这个判断相当于 count1 < minGallop && count2 <minGallop
            * 因为count1和count2总有一个为0
            * */
            } while ((count1 | count2) < minGallop);



            /*
             * 执行到这里的话,一个序列会连续的的比另一个序列大,那么这种连续性可能持续的
             * 更长。那么我们就按照这个逻辑试一试。直到这种连续性被打破。根据找到的长度,
             * 直接连续的copy就可以了,这样可以提高copy的效率。
             */
            do {
                assert len1 > 1 && len2 > 0;
                // gallopRight就是之前用过的那个方法
                count1 = gallopRight(a[cursor2], tmp, cursor1, len1, 0, c);
                if (count1 != 0) {
                    System.arraycopy(tmp, cursor1, a, dest, count1);
                    dest += count1;
                    cursor1 += count1;
                    len1 -= count1;
                    if (len1 <= 1) // 结尾处理退化的序列
                        break outer;
                }
                a[dest++] = a[cursor2++];
                if (--len2 == 0) //结尾处理退化的序列
                    break outer;

                count2 = gallopLeft(tmp[cursor1], a, cursor2, len2, 0, c);
                if (count2 != 0) {
                    System.arraycopy(a, cursor2, a, dest, count2);
                    dest += count2;
                    cursor2 += count2;
                    len2 -= count2;
                    if (len2 == 0)
                        break outer;
                }
                a[dest++] = tmp[cursor1++];
                if (--len1 == 1)
                    break outer;
                // 这里对连续性比另外一个大的阈值减少,这样更容易触发这段操作,
                // 应该是因为前面的数据表现好,后面的数据类似的可能性更高?
                minGallop--;
            } while (count1 >= MIN_GALLOP | count2 >= MIN_GALLOP); //如果连续性还是很大的话,继续这样处理s


            if (minGallop < 0)
                minGallop = 0;

            //同样,这里如果跳出了那段循环,就证明数据的顺序程度不好,应当增加阈值,避免浪费资源
            minGallop += 2;
        }  //outer 结束


        this.minGallop = minGallop < 1 ? 1 : minGallop;  // Write back to field

        //这里处理收尾工作
        if (len1 == 1) {
            assert len2 > 0;
            System.arraycopy(a, cursor2, a, dest, len2);
            a[dest + len2] = tmp[cursor1]; //  Last elt of run 1 to end of merge
        } else if (len1 == 0) {
            //因为序列1中的最后一个值,比序列2中的所有值都大,所以,不可能序列1空了,序列2还有元素
            throw new IllegalArgumentException(
                    "Comparison method violates its general contract!");
        } else {
            assert len2 == 0;
            assert len1 > 1;
            System.arraycopy(tmp, cursor1, a, dest, len1);
        }
    }

    /**
     * Like mergeLo, except that this method should be called only if
     * len1 >= len2; mergeLo should be called if len1 <= len2.  (Either method
     * may be called if len1 == len2.)
     *
     * @param base1 index of first element in first run to be merged
     * @param len1  length of first run to be merged (must be > 0)
     * @param base2 index of first element in second run to be merged
     *              (must be aBase + aLen)
     * @param len2  length of second run to be merged (must be > 0)
     */
    private void mergeHi(int base1, int len1, int base2, int len2) {
        assert len1 > 0 && len2 > 0 && base1 + len1 == base2;

        // Copy second run into temp array
        T[] a = this.a; // For performance
        T[] tmp = ensureCapacity(len2);
        System.arraycopy(a, base2, tmp, 0, len2);

        int cursor1 = base1 + len1 - 1;  // Indexes into a
        int cursor2 = len2 - 1;          // Indexes into tmp array
        int dest = base2 + len2 - 1;     // Indexes into a

        // Move last element of first run and deal with degenerate cases
        a[dest--] = a[cursor1--];
        if (--len1 == 0) {
            System.arraycopy(tmp, 0, a, dest - (len2 - 1), len2);
            return;
        }
        if (len2 == 1) {
            dest -= len1;
            cursor1 -= len1;
            System.arraycopy(a, cursor1 + 1, a, dest + 1, len1);
            a[dest] = tmp[cursor2];
            return;
        }

        Comparator<? super T> c = this.c;  // Use local variable for performance
        int minGallop = this.minGallop;    //  "    "       "     "      "
        outer:
        while (true) {
            int count1 = 0; // Number of times in a row that first run won
            int count2 = 0; // Number of times in a row that second run won

            /*
             * Do the straightforward thing until (if ever) one run
             * appears to win consistently.
             */
            do {
                assert len1 > 0 && len2 > 1;
                if (c.compare(tmp[cursor2], a[cursor1]) < 0) {
                    a[dest--] = a[cursor1--];
                    count1++;
                    count2 = 0;
                    if (--len1 == 0)
                        break outer;
                } else {
                    a[dest--] = tmp[cursor2--];
                    count2++;
                    count1 = 0;
                    if (--len2 == 1)
                        break outer;
                }
            } while ((count1 | count2) < minGallop);

            /*
             * One run is winning so consistently that galloping may be a
             * huge win. So try that, and continue galloping until (if ever)
             * neither run appears to be winning consistently anymore.
             */
            do {
                assert len1 > 0 && len2 > 1;
                count1 = len1 - gallopRight(tmp[cursor2], a, base1, len1, len1 - 1, c);
                if (count1 != 0) {
                    dest -= count1;
                    cursor1 -= count1;
                    len1 -= count1;
                    System.arraycopy(a, cursor1 + 1, a, dest + 1, count1);
                    if (len1 == 0)
                        break outer;
                }
                a[dest--] = tmp[cursor2--];
                if (--len2 == 1)
                    break outer;

                count2 = len2 - gallopLeft(a[cursor1], tmp, 0, len2, len2 - 1, c);
                if (count2 != 0) {
                    dest -= count2;
                    cursor2 -= count2;
                    len2 -= count2;
                    System.arraycopy(tmp, cursor2 + 1, a, dest + 1, count2);
                    if (len2 <= 1)  // len2 == 1 || len2 == 0
                        break outer;
                }
                a[dest--] = a[cursor1--];
                if (--len1 == 0)
                    break outer;
                minGallop--;
            } while (count1 >= MIN_GALLOP | count2 >= MIN_GALLOP);
            if (minGallop < 0)
                minGallop = 0;
            minGallop += 2;  // Penalize for leaving gallop mode
        }  // End of "outer" loop
        this.minGallop = minGallop < 1 ? 1 : minGallop;  // Write back to field

        if (len2 == 1) {
            assert len1 > 0;
            dest -= len1;
            cursor1 -= len1;
            System.arraycopy(a, cursor1 + 1, a, dest + 1, len1);
            a[dest] = tmp[cursor2];  // Move first elt of run2 to front of merge
        } else if (len2 == 0) {
            throw new IllegalArgumentException(
                    "Comparison method violates its general contract!");
        } else {
            assert len1 == 0;
            assert len2 > 0;
            System.arraycopy(tmp, 0, a, dest - (len2 - 1), len2);
        }
    }

    /**
     * 保证临时数组的大小能够容纳所有的临时元素,在需要的时候要扩展临时数组的大小。
     * 数组的大小程指数增长,来保证线性的复杂度。
     *
     * 一次申请步长太小,申请的次数必然会增多,浪费时间;一次申请的空间足够大,必然会
     * 浪费空间。正常情况下,归并排序的临时空间每次大的合并都会 * 2,
     * 最大长度不会超过数组长度的1/2。 这个长度于2 有着紧密的联系。
     *
     * @param minCapacity 临时数组需要的最小空间
     * @return tmp 临时数组
     */
    private T[] ensureCapacity(int minCapacity) {
        // 如果临时数组长度不够,那需要重新计算临时数组长度;
        // 如果长度够,直接返回当前临时数组
        if (tmp.length < minCapacity) {
            // 这里是计算最小的大于minCapacity的2的幂。方法不常见,这里分析一下。
            //
            // 假设有无符号整型 k,它的字节码如下:
            // 00000000 10000000 00000000 00000000  k
            // 00000000 11000000 00000000 00000000  k |= k >> 1;
            // 00000000 11110000 00000000 00000000  k |= k >> 2;
            // 00000000 11111111 00000000 00000000  k |= k >> 4;
            // 00000000 11111111 11111111 00000000  k |= k >> 8;
            // 00000000 11111111 11111111 11111111  k |= k >> 16
            // 上面的移位事实上只跟最高位有关系,移位的结果是最高位往后的bit全部变成了1
            // 最后 k++ 的结果 就是刚好是比 minCapacity 大的2的幂
            // 写的真是6
            int newSize = minCapacity;
            newSize |= newSize >> 1;
            newSize |= newSize >> 2;
            newSize |= newSize >> 4;
            newSize |= newSize >> 8;
            newSize |= newSize >> 16;
            newSize++;

            if (newSize < 0) // Not bloody likely! 估计作者在这里遇到bug了
                newSize = minCapacity;
            else
                newSize = Math.min(newSize, a.length >>> 1);

            @SuppressWarnings({"unchecked", "UnnecessaryLocalVariable"})
            T[] newArray = (T[]) new Object[newSize];
            tmp = newArray;
        }
        return tmp;
    }

    /**
     * 检查范围fromIndex到toIndex是否在数组内,如果不是抛异常
     *
     * @param arrayLen  整个数组的长度
     * @param fromIndex 该范围的起点
     * @param toIndex   该范围的终点
     * @throws IllegalArgumentException       if fromIndex > toIndex
     * @throws ArrayIndexOutOfBoundsException if fromIndex < 0 or toIndex > arrayLen
     */
    private static void rangeCheck(int arrayLen, int fromIndex, int toIndex) {
        if (fromIndex > toIndex)
            throw new IllegalArgumentException("fromIndex(" + fromIndex +
                    ") > toIndex(" + toIndex + ")");
        if (fromIndex < 0)
            throw new ArrayIndexOutOfBoundsException(fromIndex);
        if (toIndex > arrayLen)
            throw new ArrayIndexOutOfBoundsException(toIndex);
    }
}

下一篇 LDAP部署

评论

目录