C++ 过滤器管道
C++ Filter Pipeline
我想为我的应用程序开发一个过滤器管道。
管道应由任意数量的过滤器组成。
对于过滤器,我声明了一个抽象基础 class,如下所示:
struct AbstractFilter {
virtual void execute(const std::string& message) = 0;
virtual ~AbstractFilter() = default;
}
每个过滤器都应该继承自这个基础 class 并实现 execute
方法。
像这样:
struct PrintMessage : public AbstractFilter {
void execute(const std::string& message) override {
std::cout << "Filter A " << message << '\n';
//hand over message to next Filter
}
}
struct Upper : public AbstractFilter {
void execute(const std::string& message) override {
std::string new_line;
for (char c : line)
new_line.push_back(std::toupper(c));
//hand over message to next Filter
}
}
struct WriteToFile : public AbstractFilter {
void execute(const std::string& message) override {
std::ofstream of{"test.txt"};
of << message;
of.close();
}
}
编辑 1:
消息应该从管道中的一个过滤器发送到下一个过滤器。
例如,如果管道是这样的:
上层 -- PrintMessage -- WriteToFile
消息应该通过所有 3 个过滤器。 (例如,如果 Upper
完成了他的工作,消息应该发送到 PrintMessage
等等)
在上面的示例中,如果消息 Hello World
被发送到管道,输出应该是:
Console:
HELLO WORLD
test.txt:
HELLO WORLD
编辑 2:
过滤器仅更改给定消息的内容。类型没有改变。每个过滤器都应该使用例如字符串或给定的 class。
邮件仅转发给一位收件人。
我现在的问题是如何连接这些过滤器?
我的第一个猜测是使用 Queues
。所以每个过滤器都有一个 Input
和 Output
队列。为此,我认为每个过滤器都应该 运行 在它自己的 Thread
内,并且在数据添加到他的 Input
队列时得到通知。 (例如 FilterA 的 Output
队列也是 FilterB 的 Input
队列)
我的第二个猜测是使用责任链模式和 boost::signals2
例如,FilterB 连接到 FilterA 的信号。 FilterA 在完成工作后调用这些 Filter。
这两种解决方案哪个更灵活?还是有更好的方法来连接过滤器?
一个附加问题是否也可以 运行 整个管道在一个线程中,以便我可以启动多个管道? (在示例中有 3 个 FilterA-FilterB-FilterD 管道向上 运行ning?)
我认为 AbstractFilter 不是必需的,我建议使用 std::tuple 来定义管道:
std::tuple<FilterA, FilterB> pipeline1;
std::tuple<FilterA, FilterB, FilterC ... > pipeline2;
要通过管道 运行 消息执行(使用 c++17):
template<typename Pipeline>
void run_in_pipeline(const std::string& message, Pipeline& pipeline){
std::apply([&message](auto&& ... filter) {
(filter.execute(message), ...);
}, pipeline);
}
如果您关心性能并且过滤器必须按顺序执行,我不建议在单个管道上使用多线程或信号槽模式。如果您正在处理多线程应用程序,请考虑 运行 不同线程上的不同管道
我会这样处理:
创建一个包含抽象过滤器所有已实现版本的列表。所以,按照你的例子,在阅读输入文件后,我会得到一个列表:
[0]:Upper
[1]:PrintMessage
[2]:WriteToFile
然后单个线程(如果您需要一次处理多个字符串,则为线程轮询)在输入队列中等待一个字符串。当池中出现新字符串时,线程在过滤器列表上循环,最后将结果发布到输出队列中。
如果你想运行它并行,你需要找到一种方法来保持输入字符串和输出字符串的顺序。
我相信责任链模式更简单,允许更简洁的代码和更大的灵活性。
您不需要第三方库来实现它。
你所说的过滤器实际上是处理程序。所有处理程序都实现一个通用接口,定义一个可以命名为 handle()
的方法,甚至可以将一个对象作为参数来共享状态。每个处理程序存储指向下一个处理程序的指针。它可能会也可能不会在其上调用该方法;在后一种情况下,处理停止,它充当 filter.
运行 如果其中一些流水线阶段需要其他流水线阶段的输出作为输入,则并行流水线阶段会涉及更多。对于并行 运行 的不同管道,每个管道都会 运行 在自己的线程上,您可以使用队列将输入传递给它。
我想为我的应用程序开发一个过滤器管道。 管道应由任意数量的过滤器组成。
对于过滤器,我声明了一个抽象基础 class,如下所示:
struct AbstractFilter {
virtual void execute(const std::string& message) = 0;
virtual ~AbstractFilter() = default;
}
每个过滤器都应该继承自这个基础 class 并实现 execute
方法。
像这样:
struct PrintMessage : public AbstractFilter {
void execute(const std::string& message) override {
std::cout << "Filter A " << message << '\n';
//hand over message to next Filter
}
}
struct Upper : public AbstractFilter {
void execute(const std::string& message) override {
std::string new_line;
for (char c : line)
new_line.push_back(std::toupper(c));
//hand over message to next Filter
}
}
struct WriteToFile : public AbstractFilter {
void execute(const std::string& message) override {
std::ofstream of{"test.txt"};
of << message;
of.close();
}
}
编辑 1:
消息应该从管道中的一个过滤器发送到下一个过滤器。 例如,如果管道是这样的:
上层 -- PrintMessage -- WriteToFile
消息应该通过所有 3 个过滤器。 (例如,如果 Upper
完成了他的工作,消息应该发送到 PrintMessage
等等)
在上面的示例中,如果消息 Hello World
被发送到管道,输出应该是:
Console:
HELLO WORLD
test.txt:
HELLO WORLD
编辑 2:
过滤器仅更改给定消息的内容。类型没有改变。每个过滤器都应该使用例如字符串或给定的 class。 邮件仅转发给一位收件人。
我现在的问题是如何连接这些过滤器?
我的第一个猜测是使用 Queues
。所以每个过滤器都有一个 Input
和 Output
队列。为此,我认为每个过滤器都应该 运行 在它自己的 Thread
内,并且在数据添加到他的 Input
队列时得到通知。 (例如 FilterA 的 Output
队列也是 FilterB 的 Input
队列)
我的第二个猜测是使用责任链模式和 boost::signals2
例如,FilterB 连接到 FilterA 的信号。 FilterA 在完成工作后调用这些 Filter。
这两种解决方案哪个更灵活?还是有更好的方法来连接过滤器?
一个附加问题是否也可以 运行 整个管道在一个线程中,以便我可以启动多个管道? (在示例中有 3 个 FilterA-FilterB-FilterD 管道向上 运行ning?)
我认为 AbstractFilter 不是必需的,我建议使用 std::tuple 来定义管道:
std::tuple<FilterA, FilterB> pipeline1;
std::tuple<FilterA, FilterB, FilterC ... > pipeline2;
要通过管道 运行 消息执行(使用 c++17):
template<typename Pipeline>
void run_in_pipeline(const std::string& message, Pipeline& pipeline){
std::apply([&message](auto&& ... filter) {
(filter.execute(message), ...);
}, pipeline);
}
如果您关心性能并且过滤器必须按顺序执行,我不建议在单个管道上使用多线程或信号槽模式。如果您正在处理多线程应用程序,请考虑 运行 不同线程上的不同管道
我会这样处理: 创建一个包含抽象过滤器所有已实现版本的列表。所以,按照你的例子,在阅读输入文件后,我会得到一个列表:
[0]:Upper
[1]:PrintMessage
[2]:WriteToFile
然后单个线程(如果您需要一次处理多个字符串,则为线程轮询)在输入队列中等待一个字符串。当池中出现新字符串时,线程在过滤器列表上循环,最后将结果发布到输出队列中。
如果你想运行它并行,你需要找到一种方法来保持输入字符串和输出字符串的顺序。
我相信责任链模式更简单,允许更简洁的代码和更大的灵活性。
您不需要第三方库来实现它。
你所说的过滤器实际上是处理程序。所有处理程序都实现一个通用接口,定义一个可以命名为 handle()
的方法,甚至可以将一个对象作为参数来共享状态。每个处理程序存储指向下一个处理程序的指针。它可能会也可能不会在其上调用该方法;在后一种情况下,处理停止,它充当 filter.
运行 如果其中一些流水线阶段需要其他流水线阶段的输出作为输入,则并行流水线阶段会涉及更多。对于并行 运行 的不同管道,每个管道都会 运行 在自己的线程上,您可以使用队列将输入传递给它。