如何创建无限项目流的可观察对象

How to create an observable of a stream of infinite items

我有一个可以无限发射物品的发射器。如何在 RxJava 2 中将发射器发出的项目流转换为 Observable(或同类)。

您要流式传输数据吗?假设我正在尝试从数据库中传输数据。

return Observable.using(
            () -> getQueryConnectionSubscription(sql),

            connectionSubscription -> Observable.create((subscriber) -> {

                ResultSet resultSet = connectionSubscription.getResultSet();
                int rowNumber = 0;
                while (!subscriber.isDisposed() && resultSet.next()) {

                    T row = rowMapper.mapRow(resultSet, rowNumber);
                    subscriber.onNext(row);
                }
                subscriber.onComplete();

            }),

            (queryConnectionSubscription) -> {
                queryConnectionSubscription.close();
            });

我不确定您的数据来源是什么。但是只要你有数据,你就会一直调用 subscriber.onNext(data)。 如果您想要完整的详细信息,请查看 link https://www.developerthoughtsonline.com/2019/02/02/streaming-with-reactive-java-and-spring-jdbctemplate/

为了解决这个问题,我选择了 Subject 而不是 Observable。下面是代码示例。

public class ItemEmitter {
  private BehaviorSubject<Object> subject = BehaviorSubject.create();

  public void onEvent(Object item) {
    subject.onNext(item);
  }

  public Flowable<Object> getObservable() {
    return subject.toFlowable(BackpressureStrategy.LATEST);
  }
}