在 Flowable 中收集 2 个并行 Publisher 的结果
Collect results of 2 parallel Publishers in Flowable
我对反应式编程还很陌生,但我正在尝试应用 rxjava 来收集呼叫和短信列表并将其组合成一个。这个想法是以并行方式迭代游标,并发出连续的项目。
在此之后,所有项目都收集到一个列表中。
但在 doOnComplete
中并没有收集所有数据(我有大约 150 个项目,只添加了 ~25 个)。
我认为问题在于 Flowable.concat
不会等待所有项目,但这只是一个建议。
public void readLatestActivity() {
Cursor callsCursor = getCallsCursor();
Cursor smsCursor = getSmsCursor();
if (callsCursor == null || smsCursor == null) {
return;
}
SortedSet<MActivity> activities = new TreeSet<>((left, right) -> left.getReceiveTime() < right.getReceiveTime() ? 1 : 0);
Flowable.concat(getCallsPublisher(callsCursor), getSmsPublisher(smsCursor))
.subscribeOn(Schedulers.io())
.doOnNext(new Consumer<MActivity>() {
@Override
public void accept(@NonNull MActivity mActivity) throws Exception {
if (mActivity != null){
activities.add(mActivity);
}
}
})
.observeOn(AndroidSchedulers.mainThread())
.doOnComplete(new Action() {
@Override
public void run() throws Exception {
Timber.d("doOnComplete");
callsCursor.close();
smsCursor.close();
mBus.post(new EActivities(activities)); //<--- here I'm getting ~25 items
}
})
.subscribe();
}
private Publisher<MActivity> getCallsPublisher(Cursor callsCursor) {
return Flowable.fromIterable(RxCursorIterable.from(callsCursor))
.take(callsCursor.getCount() < LIMIT ? callsCursor.getCount() : LIMIT)
.subscribeOn(Schedulers.computation())
.parallel()
.runOn(Schedulers.computation())
.map((Function<Cursor, MActivity>) this::readCallFromCursor)
.sequential()
.observeOn(AndroidSchedulers.mainThread());
}
private Publisher<MActivity> getSmsPublisher(Cursor smsCursor) {
return Flowable.fromIterable(RxCursorIterable.from(smsCursor))
.take(smsCursor.getCount() < LIMIT ? smsCursor.getCount() : LIMIT)
.subscribeOn(Schedulers.computation())
.parallel()
.runOn(Schedulers.computation())
.map((Function<Cursor, MActivity>) this::readSmsFromCursor)
.sequential()
.observeOn(AndroidSchedulers.mainThread());
}
读取短信和通话的方法运行良好。我相信问题出在其他地方。
我还发现了一个 class 使得 cursor
表现得像 Iterable
public class RxCursorIterable implements Iterable<Cursor> {
private Cursor mIterableCursor;
public RxCursorIterable(Cursor c) {
mIterableCursor = c;
}
public static RxCursorIterable from(Cursor c) {
return new RxCursorIterable(c);
}
@Override
public Iterator<Cursor> iterator() {
return RxCursorIterator.from(mIterableCursor);
}
private static class RxCursorIterator implements Iterator<Cursor> {
private final Cursor mCursor;
public RxCursorIterator(Cursor cursor) {
mCursor = cursor;
}
public static Iterator<Cursor> from(Cursor cursor) {
return new RxCursorIterator(cursor);
}
@Override
public boolean hasNext() {
return !mCursor.isClosed() && mCursor.moveToNext();
}
@Override
public Cursor next() {
return mCursor;
}
}
}
我认为问题在于您在创建 TreeSet
时使用的比较器。它不会 return -1
当 left.getReceiveTime() > right.getReceiveTime()
时,即某些活动被错误地认为是重复的。
我还建议像这样简化您的 getCallsPublisher
代码:
return Flowable.generate(getCallsCursor(),
(cursor, emitter) -> {
if (!cursor.isClosed() && cursor.moveToNext())
emitter.onNext(readCallFromCursor(cursor));
else
emitter.onComplete();
}, Cursor::close)
.take(LIMIT);
并删除 RxCursorIterable
。
我对反应式编程还很陌生,但我正在尝试应用 rxjava 来收集呼叫和短信列表并将其组合成一个。这个想法是以并行方式迭代游标,并发出连续的项目。 在此之后,所有项目都收集到一个列表中。
但在 doOnComplete
中并没有收集所有数据(我有大约 150 个项目,只添加了 ~25 个)。
我认为问题在于 Flowable.concat
不会等待所有项目,但这只是一个建议。
public void readLatestActivity() {
Cursor callsCursor = getCallsCursor();
Cursor smsCursor = getSmsCursor();
if (callsCursor == null || smsCursor == null) {
return;
}
SortedSet<MActivity> activities = new TreeSet<>((left, right) -> left.getReceiveTime() < right.getReceiveTime() ? 1 : 0);
Flowable.concat(getCallsPublisher(callsCursor), getSmsPublisher(smsCursor))
.subscribeOn(Schedulers.io())
.doOnNext(new Consumer<MActivity>() {
@Override
public void accept(@NonNull MActivity mActivity) throws Exception {
if (mActivity != null){
activities.add(mActivity);
}
}
})
.observeOn(AndroidSchedulers.mainThread())
.doOnComplete(new Action() {
@Override
public void run() throws Exception {
Timber.d("doOnComplete");
callsCursor.close();
smsCursor.close();
mBus.post(new EActivities(activities)); //<--- here I'm getting ~25 items
}
})
.subscribe();
}
private Publisher<MActivity> getCallsPublisher(Cursor callsCursor) {
return Flowable.fromIterable(RxCursorIterable.from(callsCursor))
.take(callsCursor.getCount() < LIMIT ? callsCursor.getCount() : LIMIT)
.subscribeOn(Schedulers.computation())
.parallel()
.runOn(Schedulers.computation())
.map((Function<Cursor, MActivity>) this::readCallFromCursor)
.sequential()
.observeOn(AndroidSchedulers.mainThread());
}
private Publisher<MActivity> getSmsPublisher(Cursor smsCursor) {
return Flowable.fromIterable(RxCursorIterable.from(smsCursor))
.take(smsCursor.getCount() < LIMIT ? smsCursor.getCount() : LIMIT)
.subscribeOn(Schedulers.computation())
.parallel()
.runOn(Schedulers.computation())
.map((Function<Cursor, MActivity>) this::readSmsFromCursor)
.sequential()
.observeOn(AndroidSchedulers.mainThread());
}
读取短信和通话的方法运行良好。我相信问题出在其他地方。
我还发现了一个 class 使得 cursor
表现得像 Iterable
public class RxCursorIterable implements Iterable<Cursor> {
private Cursor mIterableCursor;
public RxCursorIterable(Cursor c) {
mIterableCursor = c;
}
public static RxCursorIterable from(Cursor c) {
return new RxCursorIterable(c);
}
@Override
public Iterator<Cursor> iterator() {
return RxCursorIterator.from(mIterableCursor);
}
private static class RxCursorIterator implements Iterator<Cursor> {
private final Cursor mCursor;
public RxCursorIterator(Cursor cursor) {
mCursor = cursor;
}
public static Iterator<Cursor> from(Cursor cursor) {
return new RxCursorIterator(cursor);
}
@Override
public boolean hasNext() {
return !mCursor.isClosed() && mCursor.moveToNext();
}
@Override
public Cursor next() {
return mCursor;
}
}
}
我认为问题在于您在创建 TreeSet
时使用的比较器。它不会 return -1
当 left.getReceiveTime() > right.getReceiveTime()
时,即某些活动被错误地认为是重复的。
我还建议像这样简化您的 getCallsPublisher
代码:
return Flowable.generate(getCallsCursor(),
(cursor, emitter) -> {
if (!cursor.isClosed() && cursor.moveToNext())
emitter.onNext(readCallFromCursor(cursor));
else
emitter.onComplete();
}, Cursor::close)
.take(LIMIT);
并删除 RxCursorIterable
。