使用 RxCpp 构造 Observer/Observable 模式

Construct Observer/Observable pattern using RxCpp

我正在尝试在 Rx-cpp. These is a very interesting tutorial in Rx.Net 中实施一个 observer/observable 模式,以了解人们如何做到这一点。

在此 C# 示例中,有特定的 interfaces 我们必须覆盖:

public interface IObserver<in T>
{
    void OnCompleted();
    void OnError(Exception error);
    void OnNext(T value);
}


public interface IObservable<out T>
{
    IDisposable Subscribe(IObserver<T> observer);
}

据我了解,在Rx-cpp中没有这样的便利。那么,是否可以向我提供一些类似于上面的 interfaces 的 header 示例 (myObservable.h/myObserver.h),我可以将其用作定义相同的通信模式?

非常感谢任何帮助, 谢谢!

编辑 1: 感谢 @zentrunix,我正在尝试进行面向 class 的交流。到目前为止,我有可观察模式的代码如下。我想要的是定义一个观察者列表,我会将其附加到可观察对象中,并且在调用 OnNext 时应通知这些观察者。但是,还有遗漏的部分。

  1. 当调用 myObservable::Subscribe() 函数时,我如何 subscribe() 观察那些观察者 (Rx::subscribers<int>)。
  2. 我还能怎样unsubscribe()
  3. 最后,多个onNext观察者中对应的o.subscribe(onNext, onEnd);会怎样?能不能构造一个对应的myObserverclass? (再次受到 here 的启发)
  4. 不好意思问,这样的组织有意义吗?到目前为止,我一直在使用此 tutorial 中提供的体系结构,这就是我沉迷于此任务的原因。我发现它是参与 RxCpp 的一种方式。非常感谢任何意见。(再次为我的无知感到抱歉。)

    class myObservable {
    
    private:
    
    std::shared_ptr<std::list<rxcpp::subscriber<int>>> observers;
    
    public:
    
    myObservable() { observers = std::make_shared<std::list<Rx::subscriber<int>>>(); };
    
    Rx::observable<int> Attach(std::shared_ptr<rxcpp::subscriber<int>> out) {
    
        return Rx::observable<>::create<int>([&, out]() {
            auto it = observers->insert(observers->end(), *out);
            it->add([=]() {
                observers->erase(it);
            });
        });
    
    };
    
    void OnNext(int sendItem) {
    
        for (Rx::subscriber<int> observer : *observers) {
            (observer).on_next(sendItem);
        }
    }
    
    void Disposer(Rx::subscriber<int> out) {
    
        observers->erase(std::remove(observers->begin(), observers->end(), &out), observers->end());
    };
    };
    

下面的 RxCpp 中的一个非常简单的例子。 不过(至少)有一个警告:典型的 RxCpp 代码大量使用 lambda,我非常不喜欢它。

我也试图在 Internet 上查找文档和教程,但找不到。我对线程模型的解释特别感兴趣。

如果您愿意仔细阅读代码和 Doxygen 文档,RxCpp GitHub 站点中有很多示例。

#include <iostream>
#include <exception>

#include "rxcpp/rx.hpp"
namespace rx = rxcpp;

static void onNext(int n) { std::cout << "* " << n << "\n"; }
static void onEnd() { std::cout << "* end\n"; }

static void onError(std::exception_ptr ep)
{
  try { std::rethrow_exception(ep); }
  catch (std::exception& e) { std::cout << "* exception " << e.what() << '\n'; }
}

static void observableImpl(rx::subscriber<int> s)
{
  s.on_next(1);
  s.on_next(2);
  s.on_completed();
}

int main()
{
  auto o = rxcpp::observable<>::create<int>(observableImpl);
  std::cout << "*\n";
  o.subscribe(onNext, onEnd);
}