Rx安卓。简单的缓存和过滤数据

RxAndroid. Simple caching and filtering data

我阅读了大量关于 Rx 的文献,一方面,一切都很清楚,但另一方面,也没有什么是清楚的。我正在尝试使用此库对来自服务器的数据进行简单的过滤和存储,但它没有按预期工作。我需要实现一个简单的频道列表管理:

1. Cache on disk and in memory
2. When user requested - return filtered channels
3. All Subscribers that was attached to this Observable must be notified if filtered cahnnels list was changed (call onNext() for all Subscribers)

我写了以下内容:

ArrayList<Channel> channels = null;
ArrayList<Channel> filteredChannels = null;

Observable<ArrayList<Channel>> filteredObservable = Observable.create()
// here I need to check if cache is valid, if no - download from server
// then filter channels and return filteredChannels in onNext call
// all subscribers that called subscribe before and not called unsubscribe must be notified if filteredChannels changed
// how to implement this?
.subscribeOn(Schedulers.io());

public void setFilter(Filter filter) {
// update filteredChannels with the new filter and notify all Subscribers (call onNext()). How to implement this?
}

public Observable<ArrayList<Channel>> getFilteredChannels() {
    return filteredObservable;
}

我是否正确理解了 Rx 模式的逻辑?提前致谢。

我会使用 .map() 的功能来过滤您的列表,然后 return 您想要的输出。这意味着您的 Observable 将发出过滤后的结果。

然后您可以在 .subscribe() 函数中将筛选后的列表提交到缓存中。

我找到了这个任务的解决方案:我需要使用 flatMapmap 并且我尝试使用 BehaviourSubject 通知所有订阅者变量更改,但这个解决方案是根本不稳定,因为错误 MissingBackpressureException,这就是为什么我写下一个 ObsevableValue class:

public class ObservableValue<T> {
    private static final String TAG = ObservableValue.class.getSimpleName();

    private ArrayList<Registration> subscriptions = new ArrayList<>();
    private T value;
    private int index;
    private boolean firstNotify;
    private ReentrantLock lock = new ReentrantLock();

    public ObservableValue() {
        firstNotify = false;
    }

    public ObservableValue(T defaultValue) {
        value = defaultValue;
        firstNotify = true;
    }

    @SuppressWarnings("unchecked")
    public void setValue(T value) {
        lock.lock();
        this.value = value;
        for (Registration listener: subscriptions) {
            if (!listener.isFinished()) {
                listener.getListener().valueChanged(value);
            }
        }
        lock.unlock();
    }

    public T getValue() {
        lock.lock();
        T result = value;
        lock.unlock();
        return result;
    }

    public Registration subscribe(ValueChangeListener<T> listener) {
        lock.lock();
        Registration s = new Registration(this, index, listener);
        index++;
        subscriptions.add(s);
        if (firstNotify||value!=null) {
            listener.valueChanged(value);
        }
        lock.unlock();
        return s;
    }

    protected void finish(int index) {
        lock.lock();
        for (int i=0;i<subscriptions.size();i++) {
            Registration s = subscriptions.get(i);
            if (s.getIndex()==index) {
                subscriptions.remove(i);
                break;
            }
        }
        lock.unlock();
    }
}

public abstract class ValueChangeListener<T> {
    private Looper looper;

    public ValueChangeListener() {}

    public ValueChangeListener(Looper looper) {
        this.looper = looper;
    }

    public void valueChanged(T data) {
        if (looper!=null) {
            Handler handler = new Handler(looper, msg -> {
                onValueChanged(data);
                return true;
            });
            handler.sendEmptyMessage(0);
        } else {
            onValueChanged(data);
        }
    }

    public abstract void onValueChanged(T data);
}

public class Registration {
    private ObservableValue observableValue;
    private int index;
    private ValueChangeListener listener;
    private volatile boolean finished = false;

    protected Registration(ObservableValue observableValue, int index, ValueChangeListener listener) {
        this.observableValue = observableValue;
        this.index = index;
        this.listener = listener;
    }

    protected ValueChangeListener getListener() {
        return listener;
    }

    protected int getIndex() {
        return index;
    }

    public boolean isFinished() {
        return finished;
    }

    public void finish() {
        finished = true;
        observableValue.finish(index);
    }
}

现在一切正常。就像在 Rx ObservableValue returns 中一样,注册对象在不再需要时需要 finished()