RxJava- 一个单一的热源 Observable?好主意还是坏主意?
RxJava- A single Hot Source Observable? Good or Bad idea?
我很欣赏 RxJava 可以处理的所有复杂性,从处理用户事件到大型复杂的反应式算法结构。但是,我仍然在为源 Observable
的开始和所有事件的起源以及如何管理多个事件起源而苦苦挣扎。
我知道这个问题的答案可能是 "it depends",但是将源事件合并到单个 Subject
或某种形式的 Hot Observable 是个坏主意吗?
例如,假设我有一个数据驱动的桌面应用程序。我有一个可枚举的标识根级事件类型。
public enum AppEvent {
TRANSACTIONS_REFRESH,
CATEGORIES_REFRESH
}
然后我有一个 AppEventEngine
单例,它包含一个 PublishSubject
并提供一种方法来处理 post 事件和访问 Observable<AppEvent>
。
public final class AppEventEngine {
private static final AppEventEngine instance = new AppEventEngine();
private final PublishSubject<AppEvent> eventBus = PublishSubject.create();
public void post(AppEvent appEvent) {
eventBus.onNext(appEvent);
}
public Observable<AppEvent> getEvents(AppEvent appEvent) {
return eventBus.startWith(appEvent).filter(e -> e.equals(appEvent));
}
public static AppEventEngine get() {
return instance;
}
private AppEventEngine() {}
}
然后我可以将 AppEvent.CATEGORIES_REFRESH
发送到 CategoryManager
并使用 RxJava-JDBC 构造和发送从查询派生的新 List<Category>
。
public final class CategoryManager {
private final Database db = //instantiate database;
private final Observable<List<Category>> categories;
private CategoryManager() {
categories = AppEventEngine.get().getEvents(AppEvent.CATEGORIES_REFRESH)
.flatMap(e -> db.select("SELECT * FROM CATEGORY").get(rs -> new Category(rs.getInt("ID"), rs.getString("DESC"))).toList()).cache(1);
}
public Observable<List<Category>> getCategories() {
return categories;
}
}
那么外部客户端类不仅可以改造订阅那个Observable<List<Category>>
,还可以随时推送AppEvent.CATEGORIES_REFRESH
事件。
public final class SomeClientUX {
//gui code
public void categoryRefreshButtonPressed() {
AppEventEngine.get().post(AppEvent.CATEGORIES_REFRESH);
}
}
我的问题是...遵循这种 EventBus
模式 (much like that in Google Guava) 是一种编写 RxJava 桌面应用程序的合乎规范的方式吗?或者我应该避免使用 PublishSubject
并以不同的方式(以分散的方式)导出事件?
我认为这是一个有效的模式,并且是 PublishSubject
的常见用例。不过,有一点建议是,如果您希望事件从不同的线程被推送到它,那么序列化来自 PublishSubject
的排放(除非被推送的事件之间存在正式的先行发生关系,但这是您可能不会进行的优化不需要)。所以声明应该是:
private final Subject<AppEvent> eventBus =
PublishSubject.create().toSerialized();
我很欣赏 RxJava 可以处理的所有复杂性,从处理用户事件到大型复杂的反应式算法结构。但是,我仍然在为源 Observable
的开始和所有事件的起源以及如何管理多个事件起源而苦苦挣扎。
我知道这个问题的答案可能是 "it depends",但是将源事件合并到单个 Subject
或某种形式的 Hot Observable 是个坏主意吗?
例如,假设我有一个数据驱动的桌面应用程序。我有一个可枚举的标识根级事件类型。
public enum AppEvent {
TRANSACTIONS_REFRESH,
CATEGORIES_REFRESH
}
然后我有一个 AppEventEngine
单例,它包含一个 PublishSubject
并提供一种方法来处理 post 事件和访问 Observable<AppEvent>
。
public final class AppEventEngine {
private static final AppEventEngine instance = new AppEventEngine();
private final PublishSubject<AppEvent> eventBus = PublishSubject.create();
public void post(AppEvent appEvent) {
eventBus.onNext(appEvent);
}
public Observable<AppEvent> getEvents(AppEvent appEvent) {
return eventBus.startWith(appEvent).filter(e -> e.equals(appEvent));
}
public static AppEventEngine get() {
return instance;
}
private AppEventEngine() {}
}
然后我可以将 AppEvent.CATEGORIES_REFRESH
发送到 CategoryManager
并使用 RxJava-JDBC 构造和发送从查询派生的新 List<Category>
。
public final class CategoryManager {
private final Database db = //instantiate database;
private final Observable<List<Category>> categories;
private CategoryManager() {
categories = AppEventEngine.get().getEvents(AppEvent.CATEGORIES_REFRESH)
.flatMap(e -> db.select("SELECT * FROM CATEGORY").get(rs -> new Category(rs.getInt("ID"), rs.getString("DESC"))).toList()).cache(1);
}
public Observable<List<Category>> getCategories() {
return categories;
}
}
那么外部客户端类不仅可以改造订阅那个Observable<List<Category>>
,还可以随时推送AppEvent.CATEGORIES_REFRESH
事件。
public final class SomeClientUX {
//gui code
public void categoryRefreshButtonPressed() {
AppEventEngine.get().post(AppEvent.CATEGORIES_REFRESH);
}
}
我的问题是...遵循这种 EventBus
模式 (much like that in Google Guava) 是一种编写 RxJava 桌面应用程序的合乎规范的方式吗?或者我应该避免使用 PublishSubject
并以不同的方式(以分散的方式)导出事件?
我认为这是一个有效的模式,并且是 PublishSubject
的常见用例。不过,有一点建议是,如果您希望事件从不同的线程被推送到它,那么序列化来自 PublishSubject
的排放(除非被推送的事件之间存在正式的先行发生关系,但这是您可能不会进行的优化不需要)。所以声明应该是:
private final Subject<AppEvent> eventBus =
PublishSubject.create().toSerialized();