如何创建无限项目流的可观察对象
How to create an observable of a stream of infinite items
我有一个可以无限发射物品的发射器。如何在 RxJava 2 中将发射器发出的项目流转换为 Observable(或同类)。
您要流式传输数据吗?假设我正在尝试从数据库中传输数据。
return Observable.using(
() -> getQueryConnectionSubscription(sql),
connectionSubscription -> Observable.create((subscriber) -> {
ResultSet resultSet = connectionSubscription.getResultSet();
int rowNumber = 0;
while (!subscriber.isDisposed() && resultSet.next()) {
T row = rowMapper.mapRow(resultSet, rowNumber);
subscriber.onNext(row);
}
subscriber.onComplete();
}),
(queryConnectionSubscription) -> {
queryConnectionSubscription.close();
});
我不确定您的数据来源是什么。但是只要你有数据,你就会一直调用 subscriber.onNext(data)。
如果您想要完整的详细信息,请查看 link
https://www.developerthoughtsonline.com/2019/02/02/streaming-with-reactive-java-and-spring-jdbctemplate/
为了解决这个问题,我选择了 Subject 而不是 Observable。下面是代码示例。
public class ItemEmitter {
private BehaviorSubject<Object> subject = BehaviorSubject.create();
public void onEvent(Object item) {
subject.onNext(item);
}
public Flowable<Object> getObservable() {
return subject.toFlowable(BackpressureStrategy.LATEST);
}
}
我有一个可以无限发射物品的发射器。如何在 RxJava 2 中将发射器发出的项目流转换为 Observable(或同类)。
您要流式传输数据吗?假设我正在尝试从数据库中传输数据。
return Observable.using(
() -> getQueryConnectionSubscription(sql),
connectionSubscription -> Observable.create((subscriber) -> {
ResultSet resultSet = connectionSubscription.getResultSet();
int rowNumber = 0;
while (!subscriber.isDisposed() && resultSet.next()) {
T row = rowMapper.mapRow(resultSet, rowNumber);
subscriber.onNext(row);
}
subscriber.onComplete();
}),
(queryConnectionSubscription) -> {
queryConnectionSubscription.close();
});
我不确定您的数据来源是什么。但是只要你有数据,你就会一直调用 subscriber.onNext(data)。 如果您想要完整的详细信息,请查看 link https://www.developerthoughtsonline.com/2019/02/02/streaming-with-reactive-java-and-spring-jdbctemplate/
为了解决这个问题,我选择了 Subject 而不是 Observable。下面是代码示例。
public class ItemEmitter {
private BehaviorSubject<Object> subject = BehaviorSubject.create();
public void onEvent(Object item) {
subject.onNext(item);
}
public Flowable<Object> getObservable() {
return subject.toFlowable(BackpressureStrategy.LATEST);
}
}