RxJava 事件总线

RxJava Event Bus

使用 RxJavaRxAndroid 的第一个版本,我将 class 作为 EventBus:

public class RxBus {
private static RxBus instance;
private PublishSubject<Object> subject = PublishSubject.create();

public static RxBus instanceOf() {
    if (instance == null) {
        instance = new RxBus();
    }
    return instance;
}

public void setMessage(Object object) {
    subject.onNext(object);
}

public Observable<Object> getEvents() {
    return subject;
}
}

在任何 class 中通过 instanceOf 获取实例 我使用 setMessage 方法发出消息并使用以下代码获取发出的消息:

  bus.getEvents().subscribe(new Action1<Object>() {
        @Override
        public void call(Object o) {
            if (o instanceof String) {
                //TODO
            }
        }
    });

Action1 来自 rx.functions 包。尝试迁移使用 RxJava 2 我无法导入它。

请告诉我,将 RxJava 2 用作 EventBus

的最短方法是什么

在 RxJava2 中,Action1 已重命名为 Consumer

The remaining action interfaces were named according to the Java 8 functional types. The no argument Action0 is replaced by the io.reactivex.functions.Action for the operators and java.lang.Runnable for the Scheduler methods. Action1 has been renamed to Consumer and Action2 is called BiConsumer. ActionN is replaced by the Consumer<Object[]> type declaration.

What's different in 2.0

这是RxJava2中事件总线的一个很好的实现(代码是从这里复制的gist

import io.reactivex.Observable;
import io.reactivex.subjects.PublishSubject;

public class RxBus {
    private static volatile RxBus sRxBus = null;
    private PublishSubject<Object> mPublishSubject = PublishSubject.create();

    private RxBus() {
    }

    public static RxBus getInstance() {
        if (sRxBus == null) {
            synchronized (RxBus.class) {
                if (sRxBus == null) {
                    sRxBus = new RxBus();
                }
            }
        }
        return sRxBus;
    }

    public <T> Observable<T> subscribe(Class<T> cls) {
        return mPublishSubject
                .filter(o -> o.getClass().equals(cls))
                .map(o -> (T) o);
    }

    public void post(Object obj) {
        mPublishSubject.onNext(obj);
    }
}