RxJava门机制

RxJava gate mechanism

我想实现门机制之类的东西。 我需要一个 PublishSubject 和几个订阅者。当 PublishSubject 通过 onNext 发送数据时,只有 一个订阅者 会收到它。

例如: 我在选项卡内有 3 等于片段。他们订阅 到名为 onLoginPublisher 的全局发布。 当 onResumeonPause 调用时 gate 变为打开或关闭。 当调用 onLogin 并且由于屏幕上没有这些片段之一而没有打开任何门时,onNext 将等待片段的 onResume

看图:

您可以对门的状态使用 filter。例如,您可以将所有逻辑包装到 class:

public final class GatedSubject<T> {
    final PublishSubject<T> subject = PublishSubject.create();
    final AtomicReferenceArray<Boolean> gates;

    public GatedSubject(int numGates) {
        gates = new AtomicReferenceArray<>(numGates);
    }

    public boolean getGateStatus(int gateIndex) {
        return gates.get(gateIndex) != null;
    }

    public void setGateStatus(int gateIndex, boolean status) {
        gates.set(gateIndex, status ? Boolean.TRUE : null);
    }

    public void Observable<T> getGate(int gateIndex) {
        return subject.filter(v -> getGateStatus(gateIndex));
    }

    public void onNext(T item) {
        subject.onNext(item);
    }

    public void onError(Throwable error) {
        subject.onError(error);
    }

    public void onComplete() {
        subject.onComplete();
    }
}