Java 9 Flow 用 lambda 定义订阅者
Java 9 Flow define subscriber with lambdas
我开始玩 Java 9 Flow API,我发现但不喜欢的第一件事是,当我们将订阅者实现传递到发布者,因为我们可以用 RxJava
所以我必须定义和实现我自己的订阅者class
public class CustomSubscriber<T> implements Flow.Subscriber<T> {
protected Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
System.out.println("Subscription done:");
subscription.request(1);
}
@Override
public void onNext(T item) {
System.out.println("Got : " + item);
subscription.request(1);
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("Done");
}
}
然后将其传递给我的发布者
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
publisher.subscribe(new CustomSubscriber<>());
这真的很冗长,据我了解,这是因为我们需要在 onSubscribe
回调
中设置订阅
protected Flow.Subscription subscription;
稍后用于 onNext
以继续排放 subscription.request(1);
我仍然不明白为什么需要这种机制,但它避免了像我们在 RxJava 中所做的那样使用 Lambda,例如这个例子
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
publisher.subscribe(item -> System.out.println("do something in the onNext"),
e -> System.out.println("do something in the onError"),
() -> System.out.println("Do something in the onComplete"));
我想这是不可能的,我没有遗漏任何东西吧?
I still dont get it why this mechanism it´s needed
订阅使订阅者能够与发布者进行通信。 request
方法允许订阅者应用 backpressure,通知上游组件它已过载并 "needs a break"。为此,订阅者需要保留一个订阅实例,并且需要偶尔调用 request
以获取更多项目。
没有压力Subscriber
如果您有一个用例,其中您不需要应用背压并希望从降低的复杂性中获益,您可以实施 LaidBackSubscriber
,其中:
- 通过存储订阅并立即调用
request
实现 onSubscribe
- 通过执行构造期间给定的 lambda 然后调用
subscription.request(1)
来实现 onNext
- 通过执行构造期间给定的 lambda 来实现
onError
和 onComplete
这应该能满足您的需求。
一般建议
Java9 Flow APIwas created 作为现有异步库的集成点,而不是邀请以临时方式实现反应式组件。试验起来很棒,但如果你真的想创建一个反应式系统,现有的库可能非常适合。
Java 9 Flow API 是一组准系统,包含 4 个接口和 1 个桥 class,从非反应性世界到反应性世界。没有运算符,没有方便的 lambda 版本,没有别的。
理论上,它的引入是为了允许 JDK 本身根据反应原理构建内部组件,但没有令人放心的迹象表明这种情况正在发生。
因此,用户有责任在此 API 上构建组件,这很困难、乏味且容易出错。您最好等待主流库发布兼容版本,或者坚持使用更可用的基于 Reactive-Streams.Org 的库,例如 RxJava 2 和 Reactor 3.
如果您仍然对手动构建 Flow API 感兴趣,可以查看我的 research/prototype 库 Reactive4JavaFlow, which has the desired lambda overload implemented。
我开始玩 Java 9 Flow API,我发现但不喜欢的第一件事是,当我们将订阅者实现传递到发布者,因为我们可以用 RxJava
所以我必须定义和实现我自己的订阅者class
public class CustomSubscriber<T> implements Flow.Subscriber<T> {
protected Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
System.out.println("Subscription done:");
subscription.request(1);
}
@Override
public void onNext(T item) {
System.out.println("Got : " + item);
subscription.request(1);
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("Done");
}
}
然后将其传递给我的发布者
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
publisher.subscribe(new CustomSubscriber<>());
这真的很冗长,据我了解,这是因为我们需要在 onSubscribe
回调
protected Flow.Subscription subscription;
稍后用于 onNext
以继续排放 subscription.request(1);
我仍然不明白为什么需要这种机制,但它避免了像我们在 RxJava 中所做的那样使用 Lambda,例如这个例子
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
publisher.subscribe(item -> System.out.println("do something in the onNext"),
e -> System.out.println("do something in the onError"),
() -> System.out.println("Do something in the onComplete"));
我想这是不可能的,我没有遗漏任何东西吧?
I still dont get it why this mechanism it´s needed
订阅使订阅者能够与发布者进行通信。 request
方法允许订阅者应用 backpressure,通知上游组件它已过载并 "needs a break"。为此,订阅者需要保留一个订阅实例,并且需要偶尔调用 request
以获取更多项目。
没有压力Subscriber
如果您有一个用例,其中您不需要应用背压并希望从降低的复杂性中获益,您可以实施 LaidBackSubscriber
,其中:
- 通过存储订阅并立即调用
request
实现onSubscribe
- 通过执行构造期间给定的 lambda 然后调用
subscription.request(1)
来实现 - 通过执行构造期间给定的 lambda 来实现
onError
和onComplete
onNext
这应该能满足您的需求。
一般建议
Java9 Flow APIwas created 作为现有异步库的集成点,而不是邀请以临时方式实现反应式组件。试验起来很棒,但如果你真的想创建一个反应式系统,现有的库可能非常适合。
Java 9 Flow API 是一组准系统,包含 4 个接口和 1 个桥 class,从非反应性世界到反应性世界。没有运算符,没有方便的 lambda 版本,没有别的。
理论上,它的引入是为了允许 JDK 本身根据反应原理构建内部组件,但没有令人放心的迹象表明这种情况正在发生。
因此,用户有责任在此 API 上构建组件,这很困难、乏味且容易出错。您最好等待主流库发布兼容版本,或者坚持使用更可用的基于 Reactive-Streams.Org 的库,例如 RxJava 2 和 Reactor 3.
如果您仍然对手动构建 Flow API 感兴趣,可以查看我的 research/prototype 库 Reactive4JavaFlow, which has the desired lambda overload implemented。