当 Source 有大量记录时,Akka 流不会 运行
Akka streams don't run when Source has large number of records
我正在尝试编写一个使用 Akka Streams 的非常简单的介绍性示例。我正在尝试基本上创建一个流,该流以 运行ge 个整数作为源并过滤掉所有非素数的整数,从而生成一个素数流作为其输出。
构造流的class比较简单;为此,我有以下内容。
import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.stream.javadsl.Flow;
import com.aparapi.Kernel;
import com.aparapi.Range;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
public class PrimeStream {
private final AverageRepository averageRepository = new AverageRepository();
private final ActorSystem actorSystem;
public PrimeStream(ActorSystem actorSystem) {
this.actorSystem = actorSystem;
}
public Flow<Integer, Integer, NotUsed> filterPrimes() {
return Flow.of(Integer.class).grouped(10000).mapConcat(PrimeKernel::filterPrimes).filter( v -> v != 0);
}
}
当我运行以下测试时,它工作正常。
private final ActorSystem actorSystem = ActorSystem.create("Sys");
@Test
public void testStreams() {
Flow<Integer, Integer, NotUsed> filterStream = new PrimeStream(actorSystem).filterPrimes();
Source<Integer, NotUsed> flow = Source.range(10000000, 10001000).via(filterStream);
flow.runForeach(System.out::println, ActorMaterializer.create(actorSystem));
}
但是,当我通过将测试中的行更改为以下内容将 运行ge 增加 x10 倍时,它不再有效。
Source<Integer, NotUsed> flow = Source.range(10000000, 10010000).via(filterStream);
现在测试运行s时,没有异常抛出,没有警告。它只是 运行s,然后退出,根本不向控制台显示任何文本。
为了更加确定问题不在我的素数测试本身,我 运行 在不使用 Akka Streams 的情况下对相同的 运行ge 进行了测试,它 运行 很好。下面的代码运行s没有问题。
@Test
public void testPlain() {
List<Integer> in = IntStream.rangeClosed(10000000, 10010000).boxed().collect(Collectors.toList());
List<Integer> out = PrimeKernel.filterPrimes(in);
System.out.println(out);
}
为了清楚起见,素数测试本身接受一个整数列表,如果列表中的任何元素不是素数,则将其设置为 0。
正如@RamonJRomeroyVigil 所建议的那样,如果我将 mapConcat 部分全部删除,但保留所有内容不变,实际上会打印出 10,000 个整数。但是,如果我保持一切不变,只是简单地将 filterPrimes 替换为仅 returns 方法参数而不触及它的方法,那么它根本不会在屏幕上打印任何内容。我还尝试在开始的 filterPrime 中添加一个 println 来调试它。每当它不打印任何包含调试语句的输出时。所以根本没有尝试调用 filterPrimes。
runForeach
returns a CompletionStage
,所以如果你想看到所有的数字都被打印出来,那么你必须在 CompletionStage
上等待,否则测试函数 returns 并且程序在 CompletionStage
未完成的情况下终止。
示例:
flow.runForeach(System.out::println, ActorMaterializer.create(actorSystem)).toCompletableFuture().join();
我正在尝试编写一个使用 Akka Streams 的非常简单的介绍性示例。我正在尝试基本上创建一个流,该流以 运行ge 个整数作为源并过滤掉所有非素数的整数,从而生成一个素数流作为其输出。
构造流的class比较简单;为此,我有以下内容。
import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.stream.javadsl.Flow;
import com.aparapi.Kernel;
import com.aparapi.Range;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
public class PrimeStream {
private final AverageRepository averageRepository = new AverageRepository();
private final ActorSystem actorSystem;
public PrimeStream(ActorSystem actorSystem) {
this.actorSystem = actorSystem;
}
public Flow<Integer, Integer, NotUsed> filterPrimes() {
return Flow.of(Integer.class).grouped(10000).mapConcat(PrimeKernel::filterPrimes).filter( v -> v != 0);
}
}
当我运行以下测试时,它工作正常。
private final ActorSystem actorSystem = ActorSystem.create("Sys");
@Test
public void testStreams() {
Flow<Integer, Integer, NotUsed> filterStream = new PrimeStream(actorSystem).filterPrimes();
Source<Integer, NotUsed> flow = Source.range(10000000, 10001000).via(filterStream);
flow.runForeach(System.out::println, ActorMaterializer.create(actorSystem));
}
但是,当我通过将测试中的行更改为以下内容将 运行ge 增加 x10 倍时,它不再有效。
Source<Integer, NotUsed> flow = Source.range(10000000, 10010000).via(filterStream);
现在测试运行s时,没有异常抛出,没有警告。它只是 运行s,然后退出,根本不向控制台显示任何文本。
为了更加确定问题不在我的素数测试本身,我 运行 在不使用 Akka Streams 的情况下对相同的 运行ge 进行了测试,它 运行 很好。下面的代码运行s没有问题。
@Test
public void testPlain() {
List<Integer> in = IntStream.rangeClosed(10000000, 10010000).boxed().collect(Collectors.toList());
List<Integer> out = PrimeKernel.filterPrimes(in);
System.out.println(out);
}
为了清楚起见,素数测试本身接受一个整数列表,如果列表中的任何元素不是素数,则将其设置为 0。
正如@RamonJRomeroyVigil 所建议的那样,如果我将 mapConcat 部分全部删除,但保留所有内容不变,实际上会打印出 10,000 个整数。但是,如果我保持一切不变,只是简单地将 filterPrimes 替换为仅 returns 方法参数而不触及它的方法,那么它根本不会在屏幕上打印任何内容。我还尝试在开始的 filterPrime 中添加一个 println 来调试它。每当它不打印任何包含调试语句的输出时。所以根本没有尝试调用 filterPrimes。
runForeach
returns a CompletionStage
,所以如果你想看到所有的数字都被打印出来,那么你必须在 CompletionStage
上等待,否则测试函数 returns 并且程序在 CompletionStage
未完成的情况下终止。
示例:
flow.runForeach(System.out::println, ActorMaterializer.create(actorSystem)).toCompletableFuture().join();