使用 Reactivex 的消费者接口

Using Consumer interface of Reactivex

我是 ReactiveX 的新手。我是通过阅读源代码来学习它的。一切都那么清晰,但突然间我得到了一个名为 "Consumer" 的词,它是一个接口。它被用来代替观察者。

谁能告诉我它到底是做什么的?

我点击了几个链接,但它们都只说了一个语句Consumer 是一个接受单个值的功能接口(回调)。

我想知道它的确切工作原理。

  1. 这是什么?
  2. 我们为什么需要它?
  3. 如何使用它?
  4. 它会取代Observer吗?如果是,如何以及为什么?

Consumer 是一个简单的 Java 接口,它接受 T 类型的变量。就像你说的,它用于回调。

示例:

import io.reactivex.functions.Consumer;

Flowable.just("Hello world").subscribe(new Consumer<String>() {
      @Override public void accept(String s) {
          System.out.println(s);
      }
  });

为什么有效?我们如何使用消费者而不是观察者?

RxJava 只是创建一个观察者,将消费者传递给它,然后在 onNext

中调用它

更新

LambdaObserver 是一种观察者,由四个功能接口创建并将它们用作回调。它主要用于使用 java 8 lambda expressions。它看起来像这样:

Observable.just(new Object())
                .subscribe(
                        o -> processOnNext(o),
                        throwable -> processError(throwable),
                        () -> processCompletion(),
                        disposable -> processSubscription()
                );

A Consumer 消耗您在订阅时收到的值。它就像一个 Subscriber 将发出的数据作为回调传递。

Consumer 是一个简单的接口,它具有通用类型的回调,并且需要接收 Observable 发出的项目。

请注意,如果您只有一个消费者,您不会捕获错误并且可能会在调试时遇到问题。

您可以通过使用另一个 Consumer 作为接收 Throwable 的第二个参数来解决这个问题。

Flowable.just("Hello world")
  .subscribe(
            emittedData -> System.out.println(emittedData), // onNext
            throwable -> throwable.printStackTrace() // onError
);

以我的愚见,消费者用于反向/双向流。

例如,您的数据源发出 "Y" 复杂的时间相关操作,从参数 "X" 以 "hot" 可流动 (HF) 的形式执行。

假设参数 X 是通过 "hot" 可观察对象 (HO) 发出的,那么,您的数据源可以是订阅 "HO" 并通过发出复杂操作结果的消费者高频

在这种情况下,您拥有双向流,并且使用消费者推送通过数据源中的 HO 提供的日期。

我不确定我的回答是否真的正确... rx 有点复杂:B