Java 9 Flow 用 lambda 定义订阅者

Java 9 Flow define subscriber with lambdas

我开始玩 Java 9 Flow API,我发现但不喜欢的第一件事是,当我们将订阅者实现传递到发布者,因为我们可以用 RxJava

所以我必须定义和实现我自己的订阅者class

public class CustomSubscriber<T> implements Flow.Subscriber<T> {

        protected Flow.Subscription subscription;

        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            this.subscription = subscription;
            System.out.println("Subscription done:");
            subscription.request(1);
        }

        @Override
        public void onNext(T item) {
            System.out.println("Got : " + item);
            subscription.request(1);
        }

        @Override
        public void onError(Throwable t) {
            t.printStackTrace();
        }

        @Override
        public void onComplete() {
            System.out.println("Done");
        }
    }

然后将其传递给我的发布者

   SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
    publisher.subscribe(new CustomSubscriber<>());

这真的很冗长,据我了解,这是因为我们需要在 onSubscribe 回调

中设置订阅
protected Flow.Subscription subscription;     

稍后用于 onNext 以继续排放 subscription.request(1);

我仍然不明白为什么需要这种机制,但它避免了像我们在 RxJava 中所做的那样使用 Lambda,例如这个例子

SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
publisher.subscribe(item -> System.out.println("do something in the onNext"),
        e -> System.out.println("do something in the onError"),
        () -> System.out.println("Do something in the onComplete"));

我想这是不可能的,我没有遗漏任何东西吧?

I still dont get it why this mechanism it´s needed

订阅使订阅者能够与发布者进行通信。 request 方法允许订阅者应用 backpressure,通知上游组件它已过载并 "needs a break"。为此,订阅者需要保留一个订阅实例,并且需要偶尔调用 request 以获取更多项目。

没有压力Subscriber

如果您有一个用例,其中您不需要应用背压并希望从降低的复杂性中获益,您可以实施 LaidBackSubscriber,其中:

  • 通过存储订阅并立即调用 request 实现 onSubscribe
  • 通过执行构造期间给定的 lambda 然后调用 subscription.request(1)
  • 来实现 onNext
  • 通过执行构造期间给定的 lambda 来实现 onErroronComplete

这应该能满足您的需求。

一般建议

Java9 Flow APIwas created 作为现有异步库的集成点,而不是邀请以临时方式实现反应式组件。试验起来很棒,但如果你真的想创建一个反应式系统,现有的库可能非常适合。

Java 9 Flow API 是一组准系统,包含 4 个接口和 1 个桥 class,从非反应性世界到反应性世界。没有运算符,没有方便的 lambda 版本,没有别的。

理论上,它的引入是为了允许 JDK 本身根据反应原理构建内部组件,但没有令人放心的迹象表明这种情况正在发生。

因此,用户有责任在此 API 上构建组件,这很困难、乏味且容易出错。您最好等待主流库发布兼容版本,或者坚持使用更可用的基于 Reactive-Streams.Org 的库,例如 RxJava 2 和 Reactor 3.

如果您仍然对手动构建 Flow API 感兴趣,可以查看我的 research/prototype 库 Reactive4JavaFlow, which has the desired lambda overload implemented