`std::list<>::sort()` - 为什么突然切换到自上而下策略?

`std::list<>::sort()` - why the sudden switch to top-down strategy?

我记得自古以来最流行的实现 std::list<>::sort() 的方法就是在 bottom-up fashion (see also What makes the gcc std::list sort implementation so fast? 中实现的经典归并排序算法。

我记得看到有人恰当地将此策略称为 "onion chaining" 方法。

至少在 GCC 的 C++ 标准库实现中是这样的(例如,参见 here)。这就是 MSVC 标准库版本中旧 Dimkumware 的 STL 以及所有版本的 MSVC 一直到 VS2013 中的情况。

但是,VS2015附带的标准库突然不再遵循这种排序策略了。 VS2015 附带的库使用 自上而下 合并排序的相当简单的递归实现。这让我觉得很奇怪,因为自上而下的方法需要访问列表的中点才能将其分成两半。由于 std::list<> 不支持随机访问,找到中点的唯一方法是逐字遍历列表的一半。此外,一开始就需要知道列表中元素的总数(这在 C++11 之前不一定是 O(1) 操作)。

然而,VS2015 中的 std::list<>::sort() 正是这样做的。这是该实现的摘录,它定位中点并执行递归调用

...
iterator _Mid = _STD next(_First, _Size / 2);
_First = _Sort(_First, _Mid, _Pred, _Size / 2);
_Mid = _Sort(_Mid, _Last, _Pred, _Size - _Size / 2);
...

如您所见,他们只是漫不经心地使用 std::next 遍历列表的前半部分并到达 _Mid 迭代器。

我想知道这种转变背后的原因是什么?我所看到的只是在每个递归级别重复调用 std::next 看似明显的低效率。天真的逻辑说这是。如果他们愿意付出这样的代价,估计也能在return中有所收获。那他们得到什么?我没有立即看到该算法具有更好的缓存行为(与最初的自下而上方法相比)。我不会立即认为它在预排序序列上表现更好。

当然,因为 C++11 std::list<> 基本上需要存储它的元素计数,这使得上面的代码稍微更有效率,因为我们总是提前知道元素计数。但这似乎仍然不足以证明在每个递归级别上进行顺序扫描是合理的。

(不可否认,我没有尝试将这些实现相互竞争。也许那里有一些惊喜。)

请注意,此答案已更新,通过从列表数组到迭代器数组进行相同的更改,同时保留更快的自下而上合并,解决了下面评论中和问题之后提到的所有问题排序算法,并用自上而下的归并排序算法消除了递归导致的栈溢出的小几率。

我最初没有考虑迭代器的原因是由于 VS2015 更改为自上而下,这让我相信尝试更改现有的自下而上算法以使用迭代器存在一些问题,需要切换到较慢的自顶向下算法。直到我自己尝试分析切换到迭代器时,我才意识到自下而上算法是有解决方案的。

在@sbi 的评论中,他询问自上而下方法的作者 Stephan T. Lavavej 为什么要进行更改。斯蒂芬的回应是“避免内存分配和默认构造分配器”。 VS2015 引入了非默认可构造和有状态的分配器,这在使用先前版本的列表数组时会出现问题,因为列表的每个实例都会分配一个虚拟节点,并且需要进行更改才能处理没有默认分配器。

Lavavej 的解决方案是改用迭代器来跟踪原始列表中的 运行 边界,而不是内部列表数组。合并逻辑更改为使用 3 个迭代器参数,第一个参数是左侧开始的迭代器 运行,第二个参数是左侧结束的迭代器 运行 == 右侧开始的迭代器 运行 , 第三个参数是右端 运行 的迭代器。合并过程使用 std::list::splice 在合并操作期间移动原始列表中的节点。这具有异常安全的额外好处。如果调用者的比较函数抛出异常,列表将重新排序,但不会发生数据丢失(假设 splice 不会失败)。使用之前的方案,如果发生异常,部分(或大部分)数据将在列表的内部数组中,并且数据将从原始列表中丢失。

但是不需要切换到自上而下的合并排序。最初,我认为 VS2015 切换到自上而下的原因对我来说是未知的,所以我专注于以与 std::list::splice 相同的方式使用内部接口。后来我决定研究自底向上切换以使用迭代器数组。我意识到存储在内部数组中的 运行s 的顺序是最新的(array[0] = 最右边)到最旧的(array[last] = 最左边),并且它可以使用与 VS2015 相同的基于迭代器的合并逻辑自上而下的方法。

对于自下而上归并排序,array[i] 是指向具有 2^i 个节点的已排序子列表开头的迭代器,或者它为空(使用 std::list::end 表示空)。每个已排序子列表的末尾将是数组中下一个先前非空条目中已排序子列表的开始,或者如果在数组的开头,则在本地迭代器中(它指向最新的结束运行)。类似于自上而下的方法,迭代器数组仅用于跟踪原始链表中已排序的 运行 边界,而合并过程使用 std::list::splice 移动原始链表中的节点。

如果一个链表很大,节点比较分散,会出现很多cache miss。自下而上将比自上而下快约 30%(相当于说明自上而下比自下而上慢约 42%)。再一次,如果有足够的内存,将列表移动到数组或向量、对数组或向量进行排序,然后从排序后的数组或向量创建一个新列表通常会更快。

示例 C++ 代码:

#define ASZ 32

template <typename T>
void SortList(std::list<T> &ll)
{
    if (ll.size() < 2)                  // return if nothing to do
        return;
    std::list<T>::iterator ai[ASZ];     // array of iterators
    std::list<T>::iterator mi;          // middle iterator (end lft, bgn rgt)
    std::list<T>::iterator ei;          // end    iterator
    size_t i;
    for (i = 0; i < ASZ; i++)           // "clear" array
        ai[i] = ll.end();
    // merge nodes into array
    for (ei = ll.begin(); ei != ll.end();) {
        mi = ei++;
        for (i = 0; (i < ASZ) && ai[i] != ll.end(); i++) {
            mi = Merge(ll, ai[i], mi, ei);
            ai[i] = ll.end();
        }
        if(i == ASZ)
            i--;
        ai[i] = mi;
    }
    // merge array into single list
    ei = ll.end();                              
    for(i = 0; (i < ASZ) && ai[i] == ei; i++);
    mi = ai[i++];
    while(1){
        for( ; (i < ASZ) && ai[i] == ei; i++);
        if (i == ASZ)
            break;
        mi = Merge(ll, ai[i++], mi, ei);
    }
}

template <typename T>
typename std::list<T>::iterator Merge(std::list<T> &ll,
                             typename std::list<T>::iterator li,
                             typename std::list<T>::iterator mi,
                             typename std::list<T>::iterator ei)
{
    std::list<T>::iterator ni;
    (*mi < *li) ? ni = mi : ni = li;
    while(1){
        if(*mi < *li){
            ll.splice(li, ll, mi++);
            if(mi == ei)
                return ni;
        } else {
            if(++li == mi)
                return ni;
        }
    }
}

VS2019 的 std::list::sort() 的示例替换代码(合并逻辑被制作成一个单独的内部函数,因为它现在在两个地方使用)。

private:
    template <class _Pr2>
    iterator _Merge(_Pr2 _Pred, iterator _First, iterator _Mid, iterator _Last){
        iterator _Newfirst = _First;
        for (bool _Initial_loop = true;;
            _Initial_loop       = false) { // [_First, _Mid) and [_Mid, _Last) are sorted and non-empty
            if (_DEBUG_LT_PRED(_Pred, *_Mid, *_First)) { // consume _Mid
                if (_Initial_loop) {
                    _Newfirst = _Mid; // update return value
                }
                splice(_First, *this, _Mid++);
                if (_Mid == _Last) {
                    return _Newfirst; // exhausted [_Mid, _Last); done
                }
            }
            else { // consume _First
                ++_First;
                if (_First == _Mid) {
                    return _Newfirst; // exhausted [_First, _Mid); done
                }
            }
        }
    }

    template <class _Pr2>
    void _Sort(iterator _First, iterator _Last, _Pr2 _Pred,
        size_type _Size) { // order [_First, _Last), using _Pred, return new first
                           // _Size must be distance from _First to _Last
        if (_Size < 2) {
            return;        // nothing to do
        }
        const size_t _ASZ = 32;         // array size
        iterator _Ai[_ASZ];             // array of   iterators to runs
        iterator _Mi;                   // middle     iterator
        iterator _Li;                   // last (end) iterator
        size_t _I;                      // index to _Ai
        for (_I = 0; _I < _ASZ; _I++)   // "empty" array
            _Ai[_I] = _Last;            //   _Ai[] == _Last => empty entry
        // merge nodes into array
        for (_Li = _First; _Li != _Last;) {
            _Mi = _Li++;
            for (_I = 0; (_I < _ASZ) && _Ai[_I] != _Last; _I++) {
                _Mi = _Merge(_Pass_fn(_Pred), _Ai[_I], _Mi, _Li);
                _Ai[_I] = _Last;
            }
            if (_I == _ASZ)
                _I--;
            _Ai[_I] = _Mi;
        }
        // merge array runs into single run
        for (_I = 0; _I < _ASZ && _Ai[_I] == _Last; _I++);
        _Mi = _Ai[_I++];
        while (1) {
            for (; _I < _ASZ && _Ai[_I] == _Last; _I++);
            if (_I == _ASZ)
                break;
            _Mi = _Merge(_Pass_fn(_Pred), _Ai[_I++], _Mi, _Last);
        }
    }

此答案的其余部分是历史性的。


我能够根据@IgorTandetnik 的演示重现该问题(旧类型无法编译,新类型可以):

#include <iostream>
#include <list>
#include <memory>

template <typename T>
class MyAlloc : public std::allocator<T> {
public:
    MyAlloc(T) {}  // suppress default constructor
    
    template <typename U>
    MyAlloc(const MyAlloc<U>& other) : std::allocator<T>(other) {}
    
    template< class U > struct rebind { typedef MyAlloc<U> other; };
};

int main()
{
    std::list<int, MyAlloc<int>> l(MyAlloc<int>(0));
    l.push_back(3);
    l.push_back(0);
    l.push_back(2);
    l.push_back(1);
    l.sort();
    return 0;
}

我早在 2016 年 7 月就注意到了这一变化,并通过电子邮件发送给 P.J。 2016 年 8 月 1 日对这一变化的抱怨。他的回复片段:

Interestingly enough, our change log doesn't reflect this change. That probably means it was "suggested" by one of our larger customers and got by me on the code review. All I know now is that the change came in around the autumn of 2015. When I reviewed the code, the first thing that struck me was the line:

    iterator _Mid = _STD next(_First, _Size / 2);

which, of course, can take a very long time for a large list.

The code looks a bit more elegant than what I wrote in early 1995(!), but definitely has worse time complexity. That version was modeled after the approach by Stepanov, Lee, and Musser in the original STL. They are seldom found to be wrong in their choice of algorithms.

I'm now reverting to our latest known good version of the original code.

不知道P.J。 Plauger 对原始代码的恢复处理了新的分配器问题,或者 Microsoft 是否或如何与 Dinkumware 交互。

为了比较自上而下和自下而上的方法,我创建了一个包含 400 万个元素的链表,每个元素由一个 64 位无符号整数组成,假设我最终得到一个几乎按顺序排列的双向链表节点(即使它们是动态分配的),用随机数填充它们,然后对它们进行排序。节点不移动,只有链接改变,但现在遍历列表以随机顺序访问节点。然后我用另一组随机数填充那些随机排序的节点并再次对它们进行排序。我将 2015 年自上而下的方法与之前的自下而上方法进行了比较,该方法经过修改以匹配 2015 年所做的其他更改(sort() 现在调用带有谓词比较函数的 sort(),而不是具有两个单独的函数)。这些是结果。 更新 - 我添加了一个基于节点指针的版本,还注意到了从列表中简单地创建一个向量、排序向量、复制回来的时间。

sequential nodes: 2015 version 1.6 seconds, prior version 1.5  seconds
random nodes:     2015 version 4.0 seconds, prior version 2.8  seconds
random nodes:                  node pointer based version 2.6  seconds
random nodes:    create vector from list, sort, copy back 1.25 seconds

对于顺序节点,之前的版本只是快了一点点,但是对于随机节点,之前的版本快了30%,节点指针版本快了35%,并从列表中创建一个向量,对向量,然后复制回来的速度提高了 69%。

下面是 std::list::sort() 的第一个替换代码我用来比较之前的自下而上的小数组(_BinList[])方法与 VS2015 的自上而下的方法我希望比较是公平的,所以我修改了一份.

    void sort()
        {   // order sequence, using operator<
        sort(less<>());
        }

    template<class _Pr2>
        void sort(_Pr2 _Pred)
        {   // order sequence, using _Pred
        if (2 > this->_Mysize())
            return;
        const size_t _MAXBINS = 25;
        _Myt _Templist, _Binlist[_MAXBINS];
        while (!empty())
            {
            // _Templist = next element
            _Templist._Splice_same(_Templist.begin(), *this, begin(),
                ++begin(), 1);
            // merge with array of ever larger bins
            size_t _Bin;
            for (_Bin = 0; _Bin < _MAXBINS && !_Binlist[_Bin].empty();
                ++_Bin)
                _Templist.merge(_Binlist[_Bin], _Pred);
            // don't go past end of array
            if (_Bin == _MAXBINS)
                _Bin--;
            // update bin with merged list, empty _Templist
            _Binlist[_Bin].swap(_Templist);
            }
            // merge bins back into caller's list
            for (size_t _Bin = 0; _Bin < _MAXBINS; _Bin++)
                if(!_Binlist[_Bin].empty())
                    this->merge(_Binlist[_Bin], _Pred);
        }

我做了一些小改动。原始代码在名为 _Maxbin 的变量中跟踪实际的最大 bin,但最终合并的开销足够小,因此我删除了与 _Maxbin 关联的代码。在数组构建期间,原始代码的内部循环合并到一个 _Binlist[] 元素中,然后交换到 _Templist 中,这似乎毫无意义。我将内部循环更改为合并到 _Templist,只有在找到空的 _Binlist[] 元素后才进行交换。

下面是我用于另一个比较的 std::list::sort() 的基于节点指针的替换。这消除了与分配相关的问题。如果可能发生比较异常,则必须将数组和临时列表 (pNode) 中的所有节点追加回原始列表,或者可能将比较异常视为小于比较。

    void sort()
        {   // order sequence, using operator<
        sort(less<>());
        }

    template<class _Pr2>
        void sort(_Pr2 _Pred)
        {   // order sequence, using _Pred
        const size_t _NUMBINS = 25;
        _Nodeptr aList[_NUMBINS];           // array of lists
        _Nodeptr pNode;
        _Nodeptr pNext;
        _Nodeptr pPrev;
        if (this->size() < 2)               // return if nothing to do
            return;
        this->_Myhead()->_Prev->_Next = 0;  // set last node ->_Next = 0
        pNode = this->_Myhead()->_Next;     // set ptr to start of list
        size_t i;
        for (i = 0; i < _NUMBINS; i++)      // zero array
            aList[i] = 0;
        while (pNode != 0)                  // merge nodes into array
            {
            pNext = pNode->_Next;
            pNode->_Next = 0;
            for (i = 0; (i < _NUMBINS) && (aList[i] != 0); i++)
                {
                pNode = _MergeN(_Pred, aList[i], pNode);
                aList[i] = 0;
                }
            if (i == _NUMBINS)
                i--;
            aList[i] = pNode;
            pNode = pNext;
            }
        pNode = 0;                          // merge array into one list
        for (i = 0; i < _NUMBINS; i++)
            pNode = _MergeN(_Pred, aList[i], pNode);
        this->_Myhead()->_Next = pNode;     // update sentinel node links
        pPrev = this->_Myhead();            //  and _Prev pointers
        while (pNode)
            {
            pNode->_Prev = pPrev;
            pPrev = pNode;
            pNode = pNode->_Next;
            }
        pPrev->_Next = this->_Myhead();
        this->_Myhead()->_Prev = pPrev;
        }

    template<class _Pr2>
        _Nodeptr _MergeN(_Pr2 &_Pred, _Nodeptr pSrc1, _Nodeptr pSrc2)
        {
        _Nodeptr pDst = 0;          // destination head ptr
        _Nodeptr *ppDst = &pDst;    // ptr to head or prev->_Next
        if (pSrc1 == 0)
            return pSrc2;
        if (pSrc2 == 0)
            return pSrc1;
        while (1)
            {
            if (_DEBUG_LT_PRED(_Pred, pSrc2->_Myval, pSrc1->_Myval))
                {
                *ppDst = pSrc2;
                pSrc2 = *(ppDst = &pSrc2->_Next);
                if (pSrc2 == 0)
                    {
                    *ppDst = pSrc1;
                    break;
                    }
                }
            else
                {
                *ppDst = pSrc1;
                pSrc1 = *(ppDst = &pSrc1->_Next);
                if (pSrc1 == 0)
                    {
                    *ppDst = pSrc2;
                    break;
                    }
                }
            }
        return pDst;
        }

Stephan T. Lavavej, MSVC's standard library maintainer, who responded:

I did that to avoid memory allocation and default constructing allocators.

对此我将添加 "free basic exception safety"。

详细说明:VS2015 之前的实现存在几个缺陷:

  • _Myt _Templist, _Binlist[_MAXBINS]; 创建一堆中间 lists(_Myt 只是 list 当前实例化的类型定义;一个不那么令人困惑的拼写是,好吧,list) 在排序期间保存节点,但是这些 list 是默认构造的,这会导致许多问题:
    1. 如果使用的分配器不是默认可构造的(并且不要求分配器是默认可构造的),这根本不会编译,因为 list 的默认构造函数将尝试默认构造其分配器.
    2. 如果使用的分配器是有状态的,那么默认构造的分配器可能不等于 this->get_allocator(),这意味着后面的 splices 和 merges 在技术上是未定义的行为并且可能会在调试版本中中断。 ("Technically",因为节点最后都合并回来了,所以如果函数成功完成,你实际上并没有用错误的分配器解除分配。)
    3. Dinkumware 的 list 使用动态分配的哨兵节点,这意味着上面将执行 _MAXBINS + 1 动态分配。我怀疑很多人期望 sort 可能会抛出 bad_alloc。如果分配器是有状态的,那么这些哨兵节点可能甚至不会从正确的位置分配(参见#2)。
  • 代码不是异常安全的。特别是,比较允许抛出,如果它在中间 lists 中有元素时抛出,则这些元素在堆栈展开期间被 lists 简单地销毁。 sort 的用户当然不希望在 sort 抛出异常时对列表进行排序,但他们可能也不希望元素丢失。
    • 这与上面的 #2 交互非常差,因为现在它不仅仅是技术上的未定义行为:那些中间 list 的析构函数将解除分配并销毁使用错误的分配器拼接到它们中的节点。

这些缺陷可以修复吗?大概。 #1 和 #2 可以通过将 get_allocator() 传递给 list 的构造函数来修复:

 _Myt _Templist(get_allocator());
 _Myt _Binlist[_MAXBINS] = { _Myt(get_allocator()), _Myt(get_allocator()), 
                             _Myt(get_allocator()),  /* ... repeat _MAXBINS times */ };

异常安全问题可以通过用 try-catch 包围循环来解决,如果一个抛出异常。

修复 #3 更难,因为这意味着根本不使用 list 作为节点的持有者,这可能需要大量重构,但它是可行的。

问题是:是否值得跳过所有这些环节来提高设计性能降低的容器的性能?毕竟,真正关心性能的人可能不会首先使用 list