如何在 RxAndroid 2.0 中正确 filter/count?

How to properly filter/count in RxAndroid 2.0?

给定一个 returns 可观察的账单列表的存储库:

Observable<List<Bill>> getBills();

我希望仅在未支付一张或多张账单时显示视图。我正在尝试以下代码:

repository.getBills()
    .flatMapIterable(bills -> bills)
    .filter(bill -> !bill.isPaid())
    .count()
    .map(count -> count > 0)
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(overdue -> {
        if (!overdue) return;
        mView.showWarning();
    });

onSuccessonError 都没有被调用。

我知道存储库至少包含一个逾期项目,因为以下代码打印未支付的账单:

repository.getBills()
    .subscribeOn(Schedulers.io())
    .flatMapIterable(bills -> bills)
    .filter(bill -> !bill.isPaid())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(
        bill -> Timber.d(bill.toString()),
        e -> Timber.e(e.getMessage(), e),
        () -> Timber.d("Completed")
    );

TL;DR:
如果它永远不会完成,则 count 不起作用。如果要检查未付款项,可以使用 any 运算符、takeUntiltakeWhile。跳转到此答案中的第三项。

完整答案:

可能存在三种问题:

  1. 可能发生在showWarning()里面。我 运行 下面的代码和 它打印到期:

    findViewById(R.id.doSomething).setOnClickListener(v -> {
        clearWarning();
        getBills()
            .subscribeOn(Schedulers.io())
            .flatMapIterable(bills -> bills)
            .filter(bill -> !bill.isPaid())
            .count()
            .map(count -> count > 0)
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(
                due -> {
                  if (!due) return;
                  showWarning();
                }
            );
    });
    

    具有以下getBills()

    private Observable<List<Bill>> getBills() {
        Bill sampleBill = new Bill();
        List<Bill> bills = new ArrayList<>(1);
        bills.add(sampleBill);
        return Observable.just(bills);
    }
    

    Bill 是一个虚拟 class 仅 returning falseisPaid():

    class Bill {
        public boolean isPaid() {
            return false;
        }
    }
    

    我有一个 TextView 用于 showWarning()clearWarning() 并且它 正确打印 "Due"

  2. 另一个选项是您 getBills() 内部的问题。是否 来源成功完成(我的意思是,它调用 onComplete())?您可以手动调用它或使用 Single,但您需要在 flatMapIterable().

    之前调用 toObservable()

    根据documentation

    If the source Observable terminates with an error, Count will pass this error notification along without emitting an item first. If the source Observable does not terminate at all, Count will neither emit an item nor terminate.

  3. 如果你不能改变getBills()中的Observable,但只需要 检测何时有未支付的账单,您可以使用 takeWhiletakeUntilany:

    findViewById(R.id.doSomething).setOnClickListener(v -> {
      clearWarning();
      getBills()
          .flatMapIterable(bills -> bills)
          .takeUntil(bill -> !bill.isPaid())
          .subscribeOn(Schedulers.io())
          .observeOn(AndroidSchedulers.mainThread())
          .subscribe(
              bill -> {
                Log.d("POTATO", "Number: " + bill.getNumber() + " Paid: " + bill.isPaid());
              },
              e -> Log.e("POTATO", "Error"),
              () -> {
                Log.d("POTATO", "Complete");
                showWarning();
              }
          );
    });   }
    

    对于此示例,我将 getBills() 更改为永不完成:

     private Observable<List<Bill>> getBills() {
        List<Bill> bills = new ArrayList<>();
        bills.add(new Bill(1, true));
        bills.add(new Bill(2, true));
        bills.add(new Bill(3, false));
        bills.add(new Bill(4, false));
        return Observable.create(
            emitter -> emitter.onNext(bills)
        );
      }
    

    为了显示发射的是哪个项目,现在 Bill class 是 如下:

    class Bill {
    
      private final int number;
      private boolean isPaid;
    
      Bill(int number, boolean isPaid) {
        this.number = number;
        this.isPaid = isPaid;
      }
    
      int getNumber() {
        return number;
      }
    
      boolean isPaid() {
        return isPaid;
      }
    }
    

    Log 打印

    数量:1 支付:true
    数量:2 付费:true
    人数:3 付费:false
    完成

    然后调用 showWarning()takeWhile 当然应该具有与 takeUntil 相反的 return 值。两者都会获取付费项目并在有未付费项目时停止,但 takeWhile 甚至不会发出未付费项目(Number: 3: Paid: false 不会出现在日志中,但它会在 2 后立即完成) . any 将获得任何满足条件的物品,这对您来说可能就足够了。请注意,这是一个完全不同的解决方案。如果源只发出付费项目,它将永远不会完成。但是无论如何你应该在某个地方有一个unsubscribe

这是实际有效的完整示例。您的 getBills 方法可能有问题。最常见的是缺少 source.onComplete() 呼叫。

    //...
    getBills()
            .flatMapIterable(bills -> bills)
            .filter(bill -> !bill.isPaid)
            .count()
            .map(count -> count > 0)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(this::onNext, this::onError);

public void onNext(boolean b) {
    Log.d("TAG", "Should show:" + b);
}

public void onError(Throwable throwable) {
    Log.d("TAG", throwable.getMessage());
}

public Observable<List<Bill>> getBills() {
    return Observable.create(source -> {
        try {
            List<Bill> bills = new ArrayList<>();
            bills.add(new Bill(true));
            bills.add(new Bill(true));
            bills.add(new Bill(false));
            bills.add(new Bill(false));
            source.onNext(bills);
            source.onComplete();
        }
        catch (Throwable throwable) {
            source.onError(throwable);
        }
    });
}

public static class Bill {
    public boolean isPaid;

    public Bill(boolean isPaid) {
        this.isPaid = isPaid;
    }
}

如果您不需要计数,最好使用 any 运算符,如果源 Observable 发出的任何项目满足指定条件,则发出 true,否则为 false。

所以这段代码

.filter(bill -> !bill.isPaid())
.count()
.map(count -> count > 0).filter(bill -> !bill.isPaid())
.count()
.map(count -> count > 0)

应替换为

.any(bill -> !bill.isPaid())

警告 在某些版本的 RxJava 中,any 运算符可以命名为 exists