如何在 Android 中创建一个 Observable?

How to create an Observable in Android?

我想做的是创建一个简单的内存缓存来尝试 Observables。但是我卡住了,因为我不明白如何创建可观察对象。这是我到目前为止得到的代码:

public class MovieCache {
    MovieWrapper movieWrapper;

    public Observable<MovieWrapper> getMovies() {
       //How to create and return an Observable<MovieWrapper> here?
    }

    public void setCache(MovieWrapper wrapper) {
        movieWrapper = wrapper;
    }

    public void clearCache() {
        movieWrapper = null;
    }
}

getMovies() 方法中,我想为订阅者创建一个 Observable 和 return 我的本地字段 movieWrapper。我怎样才能做到这一点?我尝试使用 new Observable.just(movieWrapper) 但它导致空异常。

看看 this tutorial,因为它完全符合您的要求。基本上你使用 defer() 来确保你总是获得缓存对象的最新版本:

public class MovieCache {
    MovieWrapper movieWrapper;

    public Observable<MovieWrapper> getMovies() {  
        return Observable.defer(new Func0<Observable<MovieWrapper>>() {
            @Override
            public Observable<MovieWrapper> call() {
                return Observable.just(movieWrapper);
            }
        });
    }

    public void setCache(MovieWrapper wrapper) {
        movieWrapper = wrapper;
    }

    public void clearCache() {
        movieWrapper = null;
    }
}

defer() 确保您将在 订阅 时获得对象 Observable 而不是 创建 .

但是请注意,根据 post 的作者:

The only downside to defer() is that it creates a new Observable each time you get a subscriber. create() can use the same function for each subscriber, so it's more efficient. As always, measure performance and optimize if necessary.

如前所述,接受的答案有缺点

it creates a new Observable each time you get a subscriber

但它不是唯一的。

  • 如果消费者在调用 setCache(...) 之前调用 getMovies().subscribe(...),他将不会收到任何值。
  • 如果消费者想收到任何更新,他应该重新订阅(假设 setCache() 可以多次调用。

当然,所有这些都可能与您的场景无关。我只想向您展示另一种方式(我相信还有更多)。 您可以使用 BehaviorSubject 来消除所有这些缺点。

public class MovieCache {
    private BehaviorSubject<MovieWrapper> mMovieCache = BehaviorSubject.create();

    public void setCache(MovieWrapper wrapper) {
        mMovieCache.onNext(wrapper);
    }

    public Observable<MovieWrapper> getMovieObservable() {
        //use this if consumer want to receive all updates
        return mMovieCache.asObservable();
    }

    public MovieWrapper getMovie() {
        //use this if consumer want to get only current value
        //and not interested in updates
        return mMovieCache.getValue();
    }

    public void clearCache() {
        //CAUTION consumer should be ready to receive null value
        mMovieCache.onNext(null);
        //another way is to call mMovieCache.onCompleted();
        //in this case consumer should be ready to resubcribe
    }

    public static class MovieWrapper {}

}

看看BehaviorSubject marble diagram