使用 RxCpp 构造 Observer/Observable 模式
Construct Observer/Observable pattern using RxCpp
我正在尝试在 Rx-cpp
. These is a very interesting tutorial in Rx.Net
中实施一个 observer/observable
模式,以了解人们如何做到这一点。
在此 C#
示例中,有特定的 interfaces
我们必须覆盖:
public interface IObserver<in T>
{
void OnCompleted();
void OnError(Exception error);
void OnNext(T value);
}
public interface IObservable<out T>
{
IDisposable Subscribe(IObserver<T> observer);
}
据我了解,在Rx-cpp
中没有这样的便利。那么,是否可以向我提供一些类似于上面的 interfaces
的 header 示例 (myObservable.h
/myObserver.h
),我可以将其用作定义相同的通信模式?
非常感谢任何帮助,
谢谢!
编辑 1:
感谢 @zentrunix
,我正在尝试进行面向 class 的交流。到目前为止,我有可观察模式的代码如下。我想要的是定义一个观察者列表,我会将其附加到可观察对象中,并且在调用 OnNext
时应通知这些观察者。但是,还有遗漏的部分。
- 当调用
myObservable::Subscribe()
函数时,我如何 subscribe()
观察那些观察者 (Rx::subscribers<int>
)。
- 我还能怎样
unsubscribe()
。
- 最后,多个
onNext
观察者中对应的o.subscribe(onNext, onEnd);
会怎样?能不能构造一个对应的myObserver
class? (再次受到 here 的启发)
不好意思问,这样的组织有意义吗?到目前为止,我一直在使用此 tutorial 中提供的体系结构,这就是我沉迷于此任务的原因。我发现它是参与 RxCpp
的一种方式。非常感谢任何意见。(再次为我的无知感到抱歉。)
class myObservable {
private:
std::shared_ptr<std::list<rxcpp::subscriber<int>>> observers;
public:
myObservable() { observers = std::make_shared<std::list<Rx::subscriber<int>>>(); };
Rx::observable<int> Attach(std::shared_ptr<rxcpp::subscriber<int>> out) {
return Rx::observable<>::create<int>([&, out]() {
auto it = observers->insert(observers->end(), *out);
it->add([=]() {
observers->erase(it);
});
});
};
void OnNext(int sendItem) {
for (Rx::subscriber<int> observer : *observers) {
(observer).on_next(sendItem);
}
}
void Disposer(Rx::subscriber<int> out) {
observers->erase(std::remove(observers->begin(), observers->end(), &out), observers->end());
};
};
下面的 RxCpp 中的一个非常简单的例子。
不过(至少)有一个警告:典型的 RxCpp 代码大量使用 lambda,我非常不喜欢它。
我也试图在 Internet 上查找文档和教程,但找不到。我对线程模型的解释特别感兴趣。
如果您愿意仔细阅读代码和 Doxygen 文档,RxCpp GitHub 站点中有很多示例。
#include <iostream>
#include <exception>
#include "rxcpp/rx.hpp"
namespace rx = rxcpp;
static void onNext(int n) { std::cout << "* " << n << "\n"; }
static void onEnd() { std::cout << "* end\n"; }
static void onError(std::exception_ptr ep)
{
try { std::rethrow_exception(ep); }
catch (std::exception& e) { std::cout << "* exception " << e.what() << '\n'; }
}
static void observableImpl(rx::subscriber<int> s)
{
s.on_next(1);
s.on_next(2);
s.on_completed();
}
int main()
{
auto o = rxcpp::observable<>::create<int>(observableImpl);
std::cout << "*\n";
o.subscribe(onNext, onEnd);
}
我正在尝试在 Rx-cpp
. These is a very interesting tutorial in Rx.Net
中实施一个 observer/observable
模式,以了解人们如何做到这一点。
在此 C#
示例中,有特定的 interfaces
我们必须覆盖:
public interface IObserver<in T>
{
void OnCompleted();
void OnError(Exception error);
void OnNext(T value);
}
public interface IObservable<out T>
{
IDisposable Subscribe(IObserver<T> observer);
}
据我了解,在Rx-cpp
中没有这样的便利。那么,是否可以向我提供一些类似于上面的 interfaces
的 header 示例 (myObservable.h
/myObserver.h
),我可以将其用作定义相同的通信模式?
非常感谢任何帮助, 谢谢!
编辑 1:
感谢 @zentrunix
,我正在尝试进行面向 class 的交流。到目前为止,我有可观察模式的代码如下。我想要的是定义一个观察者列表,我会将其附加到可观察对象中,并且在调用 OnNext
时应通知这些观察者。但是,还有遗漏的部分。
- 当调用
myObservable::Subscribe()
函数时,我如何subscribe()
观察那些观察者 (Rx::subscribers<int>
)。 - 我还能怎样
unsubscribe()
。 - 最后,多个
onNext
观察者中对应的o.subscribe(onNext, onEnd);
会怎样?能不能构造一个对应的myObserver
class? (再次受到 here 的启发) 不好意思问,这样的组织有意义吗?到目前为止,我一直在使用此 tutorial 中提供的体系结构,这就是我沉迷于此任务的原因。我发现它是参与
RxCpp
的一种方式。非常感谢任何意见。(再次为我的无知感到抱歉。)class myObservable { private: std::shared_ptr<std::list<rxcpp::subscriber<int>>> observers; public: myObservable() { observers = std::make_shared<std::list<Rx::subscriber<int>>>(); }; Rx::observable<int> Attach(std::shared_ptr<rxcpp::subscriber<int>> out) { return Rx::observable<>::create<int>([&, out]() { auto it = observers->insert(observers->end(), *out); it->add([=]() { observers->erase(it); }); }); }; void OnNext(int sendItem) { for (Rx::subscriber<int> observer : *observers) { (observer).on_next(sendItem); } } void Disposer(Rx::subscriber<int> out) { observers->erase(std::remove(observers->begin(), observers->end(), &out), observers->end()); }; };
下面的 RxCpp 中的一个非常简单的例子。 不过(至少)有一个警告:典型的 RxCpp 代码大量使用 lambda,我非常不喜欢它。
我也试图在 Internet 上查找文档和教程,但找不到。我对线程模型的解释特别感兴趣。
如果您愿意仔细阅读代码和 Doxygen 文档,RxCpp GitHub 站点中有很多示例。
#include <iostream>
#include <exception>
#include "rxcpp/rx.hpp"
namespace rx = rxcpp;
static void onNext(int n) { std::cout << "* " << n << "\n"; }
static void onEnd() { std::cout << "* end\n"; }
static void onError(std::exception_ptr ep)
{
try { std::rethrow_exception(ep); }
catch (std::exception& e) { std::cout << "* exception " << e.what() << '\n'; }
}
static void observableImpl(rx::subscriber<int> s)
{
s.on_next(1);
s.on_next(2);
s.on_completed();
}
int main()
{
auto o = rxcpp::observable<>::create<int>(observableImpl);
std::cout << "*\n";
o.subscribe(onNext, onEnd);
}