如何消除并行 std::transform_reduce() 的中间容器?
How to eliminate intermediate container for parallel std::transform_reduce()?
经常,我必须找到 Sum( f(i), 1, N )
或 Product( f(i), 1, N )
,其中 f(i)
是计算密集型 CPU,而积分 i 来自序列范围但很大。
使用 C++20 编译器我可以编写函数:
uint64_t solution(uint64_t N)
{
std::vector<uint64_t> v(N);
std::iota(v.begin(), v.end(), 1ULL);
return std::transform_reduce(
std::execution::par,
v.cbegin(), v.cend(),
0ull,
std::plus<>(),
[]f(const uint64_t& i)->uint64_t {
uint64_t result(0);
// expensive computation of result=f(i) goes here
// ...
return result;
});
}
但这会受到 RAM 的限制。
如何仅使用 C++20 STL(即没有供应商特定库或第 3 方库)在 运行 时间内完全消除输入向量的中间内存操作,同时实现高效的并行执行?
免责声明:我之前没有实现迭代器或使用 C++20 的经验
这似乎适用于 gcc 10.1 和 -std=c++2a
。我在很短的时间内就把它放在一起,没有考虑太多,所以如果只是通过模板化的话,实现肯定可以得到改进。如果将 operator<=>
换成旧的双向比较运算符,这也应该 运行 与 C++17,但我还没有测试过。如果您发现任何错误或易于纠正的设计缺陷,欢迎您在下面发表评论,以便改进此答案。
#include <cstddef>
#if __cplusplus > 201703L
#include <compare>
#endif
#include <execution>
#include <iostream>
#include <iterator>
#include <numeric>
class counting_iterator {
public:
typedef std::ptrdiff_t difference_type;
typedef std::ptrdiff_t value_type;
typedef void pointer;
typedef void reference;
typedef std::random_access_iterator_tag iterator_category;
private:
value_type val_{0};
public:
counting_iterator() = default;
explicit counting_iterator(value_type init) noexcept : val_{init} {}
value_type operator*() const noexcept { return val_; }
value_type operator[](difference_type index) const noexcept {
return val_ + index;
}
counting_iterator &operator++() noexcept {
++val_;
return *this;
}
counting_iterator operator++(int) noexcept {
counting_iterator res{*this};
++(*this);
return res;
}
counting_iterator &operator--() noexcept {
--val_;
return *this;
}
counting_iterator operator--(int) noexcept {
counting_iterator res{*this};
--(*this);
return res;
}
friend counting_iterator operator+(counting_iterator const &it,
difference_type const &offset) noexcept;
friend counting_iterator operator+(difference_type const &offset,
counting_iterator const &it) noexcept;
friend counting_iterator operator-(counting_iterator const &it,
difference_type const &offset) noexcept;
friend difference_type operator-(counting_iterator const &a,
counting_iterator const &b) noexcept;
counting_iterator &operator+=(difference_type offset) noexcept {
val_ += offset;
return *this;
}
counting_iterator &operator-=(difference_type offset) noexcept {
val_ -= offset;
return *this;
}
friend bool operator==(counting_iterator const &a,
counting_iterator const &b) noexcept;
#if __cplusplus > 201703L
friend std::strong_ordering operator<=>(counting_iterator const &a,
counting_iterator const &b);
#else
friend bool operator!=(counting_iterator const &a,
counting_iterator const &b) noexcept;
friend bool operator<=(counting_iterator const &a,
counting_iterator const &b) noexcept;
friend bool operator>=(counting_iterator const &a,
counting_iterator const &b) noexcept;
friend bool operator<(counting_iterator const &a,
counting_iterator const &b) noexcept;
friend bool operator>(counting_iterator const &a,
counting_iterator const &b) noexcept;
#endif
};
counting_iterator
operator+(counting_iterator const &it,
counting_iterator::difference_type const &offset) noexcept {
return counting_iterator{it.val_ + offset};
}
counting_iterator operator+(counting_iterator::difference_type const &offset,
counting_iterator const &it) noexcept {
return counting_iterator{it.val_ + offset};
}
counting_iterator
operator-(counting_iterator const &it,
counting_iterator::difference_type const &offset) noexcept {
return counting_iterator{it.val_ - offset};
}
counting_iterator::difference_type
operator-(counting_iterator const &a, counting_iterator const &b) noexcept {
return a.val_ - b.val_;
}
bool operator==(counting_iterator const &a,
counting_iterator const &b) noexcept {
return a.val_ == b.val_;
}
#if __cplusplus > 201703L
std::strong_ordering operator<=>(counting_iterator const &a,
counting_iterator const &b) {
return a.val_ <=> b.val_;
}
#else
bool operator!=(counting_iterator const &a,
counting_iterator const &b) noexcept {
return a.val_ != b.val_;
}
bool operator<=(counting_iterator const &a,
counting_iterator const &b) noexcept {
return a.val_ <= b.val_;
}
bool operator>=(counting_iterator const &a,
counting_iterator const &b) noexcept {
return a.val_ >= b.val_;
}
bool operator<(counting_iterator const &a,
counting_iterator const &b) noexcept {
return a.val_ < b.val_;
}
bool operator>(counting_iterator const &a,
counting_iterator const &b) noexcept {
return a.val_ > b.val_;
}
#endif
int main() {
auto res = std::transform_reduce(
std::execution::par,
counting_iterator(0), counting_iterator(10),
0L,
std::plus<>(),
[](const std::ptrdiff_t& i) { return i * i; });
std::cout << res << std::endl;
}
编辑:我研究了 class 以使其也可用于 C++17。现在它还显式定义了 std::random_access_iterator_tag
。我仍然没有使用该执行策略进行任何并行计算,无论是迭代器还是向量,所以我不知道 class 本身是否会抑制并行执行。
经过一些修改和实验后,我确认双向迭代器(基于上面 Paul 的示例)有效:
class counting_iterator {
public:
using iterator_category = std::bidirectional_iterator_tag;
using difference_type = std::ptrdiff_t;
using value_type = std::ptrdiff_t;
private:
value_type val_;
public:
counting_iterator() : val_(0) {}
explicit counting_iterator(value_type init) : val_(init) {}
value_type operator*() noexcept { return val_; }
const value_type& operator*() const noexcept { return val_; }
counting_iterator& operator++() noexcept { ++val_; return *this; }
counting_iterator operator++(int) noexcept { counting_iterator res{ *this }; ++(*this); return res; }
counting_iterator& operator--() noexcept { --val_; return *this; }
counting_iterator operator--(int) noexcept { counting_iterator res{ *this }; --(*this); return res; }
value_type operator[](difference_type index) noexcept { return val_ + index; }
counting_iterator& operator+=(difference_type offset) noexcept { val_ += offset; return *this; }
counting_iterator& operator-=(difference_type offset) noexcept { val_ -= offset; return *this; }
counting_iterator operator+(difference_type offset) const noexcept { return counting_iterator{ *this } += offset; };
/*counting_iterator& operator+(difference_type offset) noexcept { return operator+=(offset); }*/
counting_iterator operator-(difference_type offset) const noexcept { return counting_iterator{ *this } -= offset; };
/*counting_iterator& operator-(difference_type offset) noexcept { return operator-=(offset); }*/
difference_type operator-(counting_iterator const& other) noexcept { return val_ - other.val_; }
bool operator<(counting_iterator const& b) const noexcept { return val_ < b.val_; }
bool operator==(counting_iterator const& b) const noexcept { return val_ == b.val_; }
bool operator!=(counting_iterator const& b) const noexcept { return !operator==(b); }
std::strong_ordering operator<=>(counting_iterator const& b) const noexcept { return val_ <=> b.val_; }
};
尽管 std::transform_reduce
与 iterator_category = std::random_access_iterator_tag
并行,但我无法使其工作,我认为这是性能下降的原因。
更新:
在上面的代码中,注释行使 MS 编译器选择了它们而不是复制版本替代方案,并且如果迭代器被标记为 random_access_category_tag.
,则会在并行执行期间造成严重破坏
经常,我必须找到 Sum( f(i), 1, N )
或 Product( f(i), 1, N )
,其中 f(i)
是计算密集型 CPU,而积分 i 来自序列范围但很大。
使用 C++20 编译器我可以编写函数:
uint64_t solution(uint64_t N)
{
std::vector<uint64_t> v(N);
std::iota(v.begin(), v.end(), 1ULL);
return std::transform_reduce(
std::execution::par,
v.cbegin(), v.cend(),
0ull,
std::plus<>(),
[]f(const uint64_t& i)->uint64_t {
uint64_t result(0);
// expensive computation of result=f(i) goes here
// ...
return result;
});
}
但这会受到 RAM 的限制。
如何仅使用 C++20 STL(即没有供应商特定库或第 3 方库)在 运行 时间内完全消除输入向量的中间内存操作,同时实现高效的并行执行?
免责声明:我之前没有实现迭代器或使用 C++20 的经验
这似乎适用于 gcc 10.1 和 -std=c++2a
。我在很短的时间内就把它放在一起,没有考虑太多,所以如果只是通过模板化的话,实现肯定可以得到改进。如果将 operator<=>
换成旧的双向比较运算符,这也应该 运行 与 C++17,但我还没有测试过。如果您发现任何错误或易于纠正的设计缺陷,欢迎您在下面发表评论,以便改进此答案。
#include <cstddef>
#if __cplusplus > 201703L
#include <compare>
#endif
#include <execution>
#include <iostream>
#include <iterator>
#include <numeric>
class counting_iterator {
public:
typedef std::ptrdiff_t difference_type;
typedef std::ptrdiff_t value_type;
typedef void pointer;
typedef void reference;
typedef std::random_access_iterator_tag iterator_category;
private:
value_type val_{0};
public:
counting_iterator() = default;
explicit counting_iterator(value_type init) noexcept : val_{init} {}
value_type operator*() const noexcept { return val_; }
value_type operator[](difference_type index) const noexcept {
return val_ + index;
}
counting_iterator &operator++() noexcept {
++val_;
return *this;
}
counting_iterator operator++(int) noexcept {
counting_iterator res{*this};
++(*this);
return res;
}
counting_iterator &operator--() noexcept {
--val_;
return *this;
}
counting_iterator operator--(int) noexcept {
counting_iterator res{*this};
--(*this);
return res;
}
friend counting_iterator operator+(counting_iterator const &it,
difference_type const &offset) noexcept;
friend counting_iterator operator+(difference_type const &offset,
counting_iterator const &it) noexcept;
friend counting_iterator operator-(counting_iterator const &it,
difference_type const &offset) noexcept;
friend difference_type operator-(counting_iterator const &a,
counting_iterator const &b) noexcept;
counting_iterator &operator+=(difference_type offset) noexcept {
val_ += offset;
return *this;
}
counting_iterator &operator-=(difference_type offset) noexcept {
val_ -= offset;
return *this;
}
friend bool operator==(counting_iterator const &a,
counting_iterator const &b) noexcept;
#if __cplusplus > 201703L
friend std::strong_ordering operator<=>(counting_iterator const &a,
counting_iterator const &b);
#else
friend bool operator!=(counting_iterator const &a,
counting_iterator const &b) noexcept;
friend bool operator<=(counting_iterator const &a,
counting_iterator const &b) noexcept;
friend bool operator>=(counting_iterator const &a,
counting_iterator const &b) noexcept;
friend bool operator<(counting_iterator const &a,
counting_iterator const &b) noexcept;
friend bool operator>(counting_iterator const &a,
counting_iterator const &b) noexcept;
#endif
};
counting_iterator
operator+(counting_iterator const &it,
counting_iterator::difference_type const &offset) noexcept {
return counting_iterator{it.val_ + offset};
}
counting_iterator operator+(counting_iterator::difference_type const &offset,
counting_iterator const &it) noexcept {
return counting_iterator{it.val_ + offset};
}
counting_iterator
operator-(counting_iterator const &it,
counting_iterator::difference_type const &offset) noexcept {
return counting_iterator{it.val_ - offset};
}
counting_iterator::difference_type
operator-(counting_iterator const &a, counting_iterator const &b) noexcept {
return a.val_ - b.val_;
}
bool operator==(counting_iterator const &a,
counting_iterator const &b) noexcept {
return a.val_ == b.val_;
}
#if __cplusplus > 201703L
std::strong_ordering operator<=>(counting_iterator const &a,
counting_iterator const &b) {
return a.val_ <=> b.val_;
}
#else
bool operator!=(counting_iterator const &a,
counting_iterator const &b) noexcept {
return a.val_ != b.val_;
}
bool operator<=(counting_iterator const &a,
counting_iterator const &b) noexcept {
return a.val_ <= b.val_;
}
bool operator>=(counting_iterator const &a,
counting_iterator const &b) noexcept {
return a.val_ >= b.val_;
}
bool operator<(counting_iterator const &a,
counting_iterator const &b) noexcept {
return a.val_ < b.val_;
}
bool operator>(counting_iterator const &a,
counting_iterator const &b) noexcept {
return a.val_ > b.val_;
}
#endif
int main() {
auto res = std::transform_reduce(
std::execution::par,
counting_iterator(0), counting_iterator(10),
0L,
std::plus<>(),
[](const std::ptrdiff_t& i) { return i * i; });
std::cout << res << std::endl;
}
编辑:我研究了 class 以使其也可用于 C++17。现在它还显式定义了 std::random_access_iterator_tag
。我仍然没有使用该执行策略进行任何并行计算,无论是迭代器还是向量,所以我不知道 class 本身是否会抑制并行执行。
经过一些修改和实验后,我确认双向迭代器(基于上面 Paul 的示例)有效:
class counting_iterator {
public:
using iterator_category = std::bidirectional_iterator_tag;
using difference_type = std::ptrdiff_t;
using value_type = std::ptrdiff_t;
private:
value_type val_;
public:
counting_iterator() : val_(0) {}
explicit counting_iterator(value_type init) : val_(init) {}
value_type operator*() noexcept { return val_; }
const value_type& operator*() const noexcept { return val_; }
counting_iterator& operator++() noexcept { ++val_; return *this; }
counting_iterator operator++(int) noexcept { counting_iterator res{ *this }; ++(*this); return res; }
counting_iterator& operator--() noexcept { --val_; return *this; }
counting_iterator operator--(int) noexcept { counting_iterator res{ *this }; --(*this); return res; }
value_type operator[](difference_type index) noexcept { return val_ + index; }
counting_iterator& operator+=(difference_type offset) noexcept { val_ += offset; return *this; }
counting_iterator& operator-=(difference_type offset) noexcept { val_ -= offset; return *this; }
counting_iterator operator+(difference_type offset) const noexcept { return counting_iterator{ *this } += offset; };
/*counting_iterator& operator+(difference_type offset) noexcept { return operator+=(offset); }*/
counting_iterator operator-(difference_type offset) const noexcept { return counting_iterator{ *this } -= offset; };
/*counting_iterator& operator-(difference_type offset) noexcept { return operator-=(offset); }*/
difference_type operator-(counting_iterator const& other) noexcept { return val_ - other.val_; }
bool operator<(counting_iterator const& b) const noexcept { return val_ < b.val_; }
bool operator==(counting_iterator const& b) const noexcept { return val_ == b.val_; }
bool operator!=(counting_iterator const& b) const noexcept { return !operator==(b); }
std::strong_ordering operator<=>(counting_iterator const& b) const noexcept { return val_ <=> b.val_; }
};
尽管 std::transform_reduce
与 iterator_category = std::random_access_iterator_tag
并行,但我无法使其工作,我认为这是性能下降的原因。
更新: 在上面的代码中,注释行使 MS 编译器选择了它们而不是复制版本替代方案,并且如果迭代器被标记为 random_access_category_tag.
,则会在并行执行期间造成严重破坏