Flux 的 publishOn 方法没有按预期工作
The method publishOn of Flux doesn't not work as expected
我正在尝试将阻塞消费者集成为 Reactor Aluminium-SR1 中的 Flux 订阅者。我想使用并行 Scheduler 来同时执行阻塞操作。
我已经实现了一个主要的 class 来描述我的意图:
package etienne.peiniau;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import reactor.util.function.Tuple2;
public class Main {
public static void main(String[] args) throws InterruptedException {
Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20)
.elapsed()
.publishOn(Schedulers.parallel())
.subscribe(new Subscriber<Tuple2<Long, Integer>>() {
@Override
public void onSubscribe(Subscription subscription) {
System.out.println("[" + Thread.currentThread().getName() + "] Subscription");
subscription.request(Long.MAX_VALUE);
}
@Override
public void onNext(Tuple2<Long, Integer> t2) {
System.out.println("[" + Thread.currentThread().getName() + "] " + t2);
try {
Thread.sleep(1000); // long operation
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void onError(Throwable throwable) {
System.err.println("[" + Thread.currentThread().getName() + "] Error: " + throwable.getMessage());
}
@Override
public void onComplete() {
System.out.println("[" + Thread.currentThread().getName() + "] Complete");
}
});
// Waiting for the program to complete
System.out.println("[" + Thread.currentThread().getName() + "] Main");
Thread.sleep(100000);
}
}
这段代码的输出如下:
[main] Subscription
[main] Main
[parallel-1] [3,1]
[parallel-1] [1000,2]
[parallel-1] [1001,3]
[parallel-1] [1000,4]
[parallel-1] [1000,5]
[parallel-1] [1000,6]
[parallel-1] [1001,7]
[parallel-1] [1000,8]
[parallel-1] [1000,9]
[parallel-1] [1000,10]
[parallel-1] [1000,11]
[parallel-1] [1001,12]
[parallel-1] [1000,13]
[parallel-1] [1000,14]
[parallel-1] [1000,15]
[parallel-1] [1000,16]
[parallel-1] [1001,17]
[parallel-1] [1000,18]
[parallel-1] [1000,19]
[parallel-1] [1000,20]
[parallel-1] Complete
我的问题是长操作总是在线程 parallel-1 上执行,并且每 1 秒执行一次。
我尝试手动增加并行度或使用弹性 Scheduler,但结果是一样的。
我在想 publishOn 方法是专门为这个用例设计的。如果我误解了什么,你能告诉我吗?
实际上它按预期工作,您可以看到并行处理的所有值 - 经过的时间几乎相同,但您总是在同一线程中接收元素,并且每次等待 1 秒时都是这样。
我想在简单的 Flux
中,并行并不意味着更多线程,它意味着并行工作。例如,如果您 运行 代码如下:
Flux.fromIterable(IntStream.range(0, 20).boxed().collect(Collectors.toList()))
.map(i -> {
System.out.println("map [" + Thread.currentThread().getName() + "] " + i);
return i;
})
.elapsed()
.publishOn(Schedulers.single())
.subscribeOn(Schedulers.single())
.subscribe(t2 -> {
System.out.println("subscribe [" + Thread.currentThread().getName() + "] " + t2);
});
您将看到结果:
map [single-1] 0
map [single-1] 1
...
subscribe [single-1] [4,0]
subscribe [single-1] [0,1]
...
您可以看到,它首先对所有元素执行 map
,然后是 consume
。如果将 publishOn
更改为 .publishOn(Schedulers.parallel())
,您将看到:
map [single-1] 3
subscribe [parallel-1] [5,0]
map [single-1] 4
subscribe [parallel-1] [0,1]
map [single-1] 5
...
现在它同时在并行线程中执行这两个操作。我不确定我是否理解正确。
并行执行有特定的 ParallelFlux
。在下面的示例中,所有内容都将在不同的线程上完成:
Flux.fromIterable(IntStream.range(0, 20).boxed().collect(Collectors.toList()))
.elapsed()
.parallel()
.runOn(Schedulers.parallel())
.subscribe(t2 -> {
System.out.println("[" + Thread.currentThread().getName() + "] " + t2);
try {
Thread.sleep(1000); // long operation
} catch (InterruptedException e) {
e.printStackTrace();
}
}, throwable -> {
System.err.println("[" + Thread.currentThread().getName() + "] Error: " + throwable.getMessage());
}, () -> {
System.out.println("[" + Thread.currentThread().getName() + "] Complete");
}, subscription -> {
System.out.println("[" + Thread.currentThread().getName() + "] Subscription");
subscription.request(Long.MAX_VALUE);
});
结果如下所示:
[parallel-1] [8,0]
[parallel-2] [0,1]
[parallel-3] [0,2]
[parallel-4] [0,3]
[parallel-1] [0,4]
...
所以它使用很少的线程来处理结果。在我看来这真的是平行的。
另请注意,如果您使用方法 .subscribe(Subscriber<? super T> s)
,所有结果将按顺序使用,并并行使用它们,您应该使用:
public void subscribe(Consumer<? super T> onNext, Consumer<? super Throwable>
onError, Runnable onComplete, Consumer<? super Subscription> onSubscribe)
或任何其他带有 Consumer<? super T> onNext,...
个参数的重载方法
我正在尝试将阻塞消费者集成为 Reactor Aluminium-SR1 中的 Flux 订阅者。我想使用并行 Scheduler 来同时执行阻塞操作。
我已经实现了一个主要的 class 来描述我的意图:
package etienne.peiniau;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import reactor.util.function.Tuple2;
public class Main {
public static void main(String[] args) throws InterruptedException {
Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20)
.elapsed()
.publishOn(Schedulers.parallel())
.subscribe(new Subscriber<Tuple2<Long, Integer>>() {
@Override
public void onSubscribe(Subscription subscription) {
System.out.println("[" + Thread.currentThread().getName() + "] Subscription");
subscription.request(Long.MAX_VALUE);
}
@Override
public void onNext(Tuple2<Long, Integer> t2) {
System.out.println("[" + Thread.currentThread().getName() + "] " + t2);
try {
Thread.sleep(1000); // long operation
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void onError(Throwable throwable) {
System.err.println("[" + Thread.currentThread().getName() + "] Error: " + throwable.getMessage());
}
@Override
public void onComplete() {
System.out.println("[" + Thread.currentThread().getName() + "] Complete");
}
});
// Waiting for the program to complete
System.out.println("[" + Thread.currentThread().getName() + "] Main");
Thread.sleep(100000);
}
}
这段代码的输出如下:
[main] Subscription
[main] Main
[parallel-1] [3,1]
[parallel-1] [1000,2]
[parallel-1] [1001,3]
[parallel-1] [1000,4]
[parallel-1] [1000,5]
[parallel-1] [1000,6]
[parallel-1] [1001,7]
[parallel-1] [1000,8]
[parallel-1] [1000,9]
[parallel-1] [1000,10]
[parallel-1] [1000,11]
[parallel-1] [1001,12]
[parallel-1] [1000,13]
[parallel-1] [1000,14]
[parallel-1] [1000,15]
[parallel-1] [1000,16]
[parallel-1] [1001,17]
[parallel-1] [1000,18]
[parallel-1] [1000,19]
[parallel-1] [1000,20]
[parallel-1] Complete
我的问题是长操作总是在线程 parallel-1 上执行,并且每 1 秒执行一次。
我尝试手动增加并行度或使用弹性 Scheduler,但结果是一样的。
我在想 publishOn 方法是专门为这个用例设计的。如果我误解了什么,你能告诉我吗?
实际上它按预期工作,您可以看到并行处理的所有值 - 经过的时间几乎相同,但您总是在同一线程中接收元素,并且每次等待 1 秒时都是这样。
我想在简单的 Flux
中,并行并不意味着更多线程,它意味着并行工作。例如,如果您 运行 代码如下:
Flux.fromIterable(IntStream.range(0, 20).boxed().collect(Collectors.toList()))
.map(i -> {
System.out.println("map [" + Thread.currentThread().getName() + "] " + i);
return i;
})
.elapsed()
.publishOn(Schedulers.single())
.subscribeOn(Schedulers.single())
.subscribe(t2 -> {
System.out.println("subscribe [" + Thread.currentThread().getName() + "] " + t2);
});
您将看到结果:
map [single-1] 0
map [single-1] 1
...
subscribe [single-1] [4,0]
subscribe [single-1] [0,1]
...
您可以看到,它首先对所有元素执行 map
,然后是 consume
。如果将 publishOn
更改为 .publishOn(Schedulers.parallel())
,您将看到:
map [single-1] 3
subscribe [parallel-1] [5,0]
map [single-1] 4
subscribe [parallel-1] [0,1]
map [single-1] 5
...
现在它同时在并行线程中执行这两个操作。我不确定我是否理解正确。
并行执行有特定的 ParallelFlux
。在下面的示例中,所有内容都将在不同的线程上完成:
Flux.fromIterable(IntStream.range(0, 20).boxed().collect(Collectors.toList()))
.elapsed()
.parallel()
.runOn(Schedulers.parallel())
.subscribe(t2 -> {
System.out.println("[" + Thread.currentThread().getName() + "] " + t2);
try {
Thread.sleep(1000); // long operation
} catch (InterruptedException e) {
e.printStackTrace();
}
}, throwable -> {
System.err.println("[" + Thread.currentThread().getName() + "] Error: " + throwable.getMessage());
}, () -> {
System.out.println("[" + Thread.currentThread().getName() + "] Complete");
}, subscription -> {
System.out.println("[" + Thread.currentThread().getName() + "] Subscription");
subscription.request(Long.MAX_VALUE);
});
结果如下所示:
[parallel-1] [8,0]
[parallel-2] [0,1]
[parallel-3] [0,2]
[parallel-4] [0,3]
[parallel-1] [0,4]
...
所以它使用很少的线程来处理结果。在我看来这真的是平行的。
另请注意,如果您使用方法 .subscribe(Subscriber<? super T> s)
,所有结果将按顺序使用,并并行使用它们,您应该使用:
public void subscribe(Consumer<? super T> onNext, Consumer<? super Throwable>
onError, Runnable onComplete, Consumer<? super Subscription> onSubscribe)
或任何其他带有 Consumer<? super T> onNext,...
个参数的重载方法