C++ 过滤器管道

C++ Filter Pipeline

我想为我的应用程序开发一个过滤器管道。 管道应由任意数量的过滤器组成。

对于过滤器,我声明了一个抽象基础 class,如下所示:

struct AbstractFilter {
    virtual void execute(const std::string& message) = 0;
    virtual ~AbstractFilter() = default;
}

每个过滤器都应该继承自这个基础 class 并实现 execute 方法。 像这样:

struct PrintMessage : public AbstractFilter {
    void execute(const std::string& message) override {
        std::cout << "Filter A " << message << '\n';
        //hand over message to next Filter

    }
}

struct Upper : public AbstractFilter {
    void execute(const std::string& message) override {
        std::string new_line;
        for (char c : line)
           new_line.push_back(std::toupper(c));
        //hand over message to next Filter
    }
}

struct WriteToFile : public AbstractFilter {
    void execute(const std::string& message) override {
        std::ofstream of{"test.txt"};
        of << message;
        of.close();
    }
}

编辑 1:

消息应该从管道中的一个过滤器发送到下一个过滤器。 例如,如果管道是这样的:

上层 -- PrintMessage -- WriteToFile

消息应该通过所有 3 个过滤器。 (例如,如果 Upper 完成了他的工作,消息应该发送到 PrintMessage 等等)

在上面的示例中,如果消息 Hello World 被发送到管道,输出应该是:

Console:
HELLO WORLD
test.txt:
HELLO WORLD

编辑 2:

过滤器仅更改给定消息的内容。类型没有改变。每个过滤器都应该使用例如字符串或给定的 class。 邮件仅转发给一位收件人。

我现在的问题是如何连接这些过滤器?

我的第一个猜测是使用 Queues。所以每个过滤器都有一个 InputOutput 队列。为此,我认为每个过滤器都应该 运行 在它自己的 Thread 内,并且在数据添加到他的 Input 队列时得到通知。 (例如 FilterA 的 Output 队列也是 FilterB 的 Input 队列)

我的第二个猜测是使用责任链模式和 boost::signals2 例如,FilterB 连接到 FilterA 的信号。 FilterA 在完成工作后调用这些 Filter。

这两种解决方案哪个更灵活?还是有更好的方法来连接过滤器?

一个附加问题是否也可以 运行 整个管道在一个线程中,以便我可以启动多个管道? (在示例中有 3 个 FilterA-FilterB-FilterD 管道向上 运行ning?)

我认为 AbstractFilter 不是必需的,我建议使用 std::tuple 来定义管道:

std::tuple<FilterA, FilterB> pipeline1;
std::tuple<FilterA, FilterB, FilterC ... > pipeline2;

要通过管道 运行 消息执行(使用 c++17):

template<typename Pipeline>
void run_in_pipeline(const std::string& message, Pipeline& pipeline){
  std::apply([&message](auto&& ... filter) {
    (filter.execute(message), ...);
  }, pipeline);
}

如果您关心性能并且过滤器必须按顺序执行,我不建议在单个管道上使用多线程或信号槽模式。如果您正在处理多线程应用程序,请考虑 运行 不同线程上的不同管道

我会这样处理: 创建一个包含抽象过滤器所有已实现版本的列表。所以,按照你的例子,在阅读输入文件后,我会得到一个列表:

[0]:Upper 
[1]:PrintMessage
[2]:WriteToFile

然后单个线程(如果您需要一次处理多个字符串,则为线程轮询)在输入队列中等待一个字符串。当池中出现新字符串时,线程在过滤器列表上循环,最后将结果发布到输出队列中。

如果你想运行它并行,你需要找到一种方法来保持输入字符串和输出字符串的顺序。

我相信责任链模式更简单,允许更简洁的代码和更大的灵活性。

您不需要第三方库来实现它。
你所说的过滤器实际上是处理程序。所有处理程序都实现一个通用接口,定义一个可以命名为 handle() 的方法,甚至可以将一个对象作为参数来共享状态。每个处理程序存储指向下一个处理程序的指针。它可能会也可能不会在其上调用该方法;在后一种情况下,处理停止,它充当 filter.

运行 如果其中一些流水线阶段需要其他流水线阶段的输出作为输入,则并行流水线阶段会涉及更多。对于并行 运行 的不同管道,每个管道都会 运行 在自己的线程上,您可以使用队列将输入传递给它。