Akka Streams 丢弃消息?
Akka Streams dropping messages?
我是 Akka Streams 的新手。我在 Java 中使用它。 (akka-stream_2.12, 版本: 2.5.14).
我写了下面的class:
package main;
import java.io.IOException;
import akka.actor.ActorSystem;
import akka.stream.ActorMaterializer;
import akka.stream.Materializer;
import akka.stream.OverflowStrategy;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import akka.stream.javadsl.SourceQueueWithComplete;
public class AkkaTest {
public static void main(String[] args) throws IOException, InterruptedException {
final ActorSystem actorSystem = ActorSystem.create("VehicleSystem");
final Materializer materializer = ActorMaterializer.create(actorSystem);
SourceQueueWithComplete<Object> componentA_outPort1 =
Source.<Object>queue(100, OverflowStrategy.backpressure()).async()
.to(Sink.foreach(str -> System.out.println(str)))
.run(materializer);
for(int i=1; i<100000; i++)
componentA_outPort1.offer("Akka rocks: " + i);
System.in.read();
actorSystem.terminate();
System.out.println("Done.");
}
}
我希望代码打印 100000 条消息,因为这是迭代次数。相反,它只打印消息 1-101,然后是从大约 61000 开始的消息(即 "Akka rocks: 61000")。
所以大部分消息都没有打印出来。你能解释一下为什么吗?
这里问题的第一个提示是 "Done." 最后没有打印到控制台。相反,它打印在开头或 "Akka rocks" 打印之间的某个地方。
原因是SourceQueue.offer
是异步的。它 returns 一个 CompletionStage
而你不是在等待它的完成。事实上,一些流元素是 "lost" 可以用 method's documentation 来解释,具体是以下部分:
Additionally when using the backpressure overflowStrategy: - If the buffer is full the Future won't be completed until there is space in the buffer - Calling offer before the Future is completed in this case will return a failed Future
您可以通过以下方式验证这一点:
SourceQueueWithComplete<Object> componentA_outPort1 =
Source.<Object>queue(100, OverflowStrategy.backpressure()).async()
.to(Sink.foreach(str -> System.out.println(str)))
.run(materializer);
for (int i=1; i<100000; i++) {
CompletionStage<QueueOfferResult> result = componentA_outPort1.offer("Akka rocks: " + i);
System.out.println(result);
}
你会看到很多这样的东西"scala.concurrent.java8.FuturesConvertersImpl$CF@39471dfa[Not completed]"
为了解决它,你应该等待报价的CompletionStage完成,有效地使整体调用同步,这似乎是你的意图:
SourceQueueWithComplete<Object> componentA_outPort1 =
Source.<Object>queue(100, OverflowStrategy.backpressure()).async()
.to(Sink.foreach(str -> System.out.println(str)))
.run(materializer);
for (int i=1; i<100000; i++) {
componentA_outPort1.offer("Akka rocks: " + i).toCompletableFuture().join();
}
仍然"Done"不一定会在最后打印出来,因为offer的完成只保证你队列接受了这个元素,而不是它被完全处理了。另外,请记住 actorSystem.terminate()
也是异步的。
上述方法适用于您的情况,但在某些情况下可能不希望阻塞当前线程。对于像您这样的简单案例,可以通过使用不同的 Source
:
轻松避免
Source.range(1, 1000).map(i -> "Akka rocks: " + i)
对于更复杂的情况,请考虑 Source 的其他静态方法,例如采用 Iterable 的 Source.from
,或 Source.fromIterator
.
几件事:
- 喂
SourceQueue
的惯用方法是喂另一个 Source
(正如佩德罗在他的回答中提到的)。
- 您可能会在流处理完成之前终止 actor 系统。
接收器的物化值完成后关闭系统:
import java.util.concurrent.CompletionStage;
import akka.Done;
import akka.japi.Pair;
import akka.stream.javadsl.Keep;
import akka.stream.javadsl.RunnableGraph;
// other imports...
Sink<String, CompletionStage<Done>> sink =
Sink.foreach(str -> System.out.println(str));
Source<String, SourceQueueWithComplete<String>> outPort =
Source.<String>queue(100, OverflowStrategy.backpressure()).async();
RunnableGraph<Pair<SourceQueueWithComplete<String>, CompletionStage<Done>>> stream =
outPort.toMat(sink, Keep.both());
Pair<SourceQueueWithComplete<String>, CompletionStage<Done>> pair = stream.run();
Source.range(1, 100000)
.map(i -> "Akka rocks: " + i)
.mapAsync(1, s -> pair.first().offer(s))
.runWith(Sink.ignore(), materializer);
pair.second().thenRun(() -> actorSystem.terminate());
我是 Akka Streams 的新手。我在 Java 中使用它。 (akka-stream_2.12, 版本: 2.5.14).
我写了下面的class:
package main;
import java.io.IOException;
import akka.actor.ActorSystem;
import akka.stream.ActorMaterializer;
import akka.stream.Materializer;
import akka.stream.OverflowStrategy;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import akka.stream.javadsl.SourceQueueWithComplete;
public class AkkaTest {
public static void main(String[] args) throws IOException, InterruptedException {
final ActorSystem actorSystem = ActorSystem.create("VehicleSystem");
final Materializer materializer = ActorMaterializer.create(actorSystem);
SourceQueueWithComplete<Object> componentA_outPort1 =
Source.<Object>queue(100, OverflowStrategy.backpressure()).async()
.to(Sink.foreach(str -> System.out.println(str)))
.run(materializer);
for(int i=1; i<100000; i++)
componentA_outPort1.offer("Akka rocks: " + i);
System.in.read();
actorSystem.terminate();
System.out.println("Done.");
}
}
我希望代码打印 100000 条消息,因为这是迭代次数。相反,它只打印消息 1-101,然后是从大约 61000 开始的消息(即 "Akka rocks: 61000")。
所以大部分消息都没有打印出来。你能解释一下为什么吗?
这里问题的第一个提示是 "Done." 最后没有打印到控制台。相反,它打印在开头或 "Akka rocks" 打印之间的某个地方。
原因是SourceQueue.offer
是异步的。它 returns 一个 CompletionStage
而你不是在等待它的完成。事实上,一些流元素是 "lost" 可以用 method's documentation 来解释,具体是以下部分:
Additionally when using the backpressure overflowStrategy: - If the buffer is full the Future won't be completed until there is space in the buffer - Calling offer before the Future is completed in this case will return a failed Future
您可以通过以下方式验证这一点:
SourceQueueWithComplete<Object> componentA_outPort1 =
Source.<Object>queue(100, OverflowStrategy.backpressure()).async()
.to(Sink.foreach(str -> System.out.println(str)))
.run(materializer);
for (int i=1; i<100000; i++) {
CompletionStage<QueueOfferResult> result = componentA_outPort1.offer("Akka rocks: " + i);
System.out.println(result);
}
你会看到很多这样的东西"scala.concurrent.java8.FuturesConvertersImpl$CF@39471dfa[Not completed]"
为了解决它,你应该等待报价的CompletionStage完成,有效地使整体调用同步,这似乎是你的意图:
SourceQueueWithComplete<Object> componentA_outPort1 =
Source.<Object>queue(100, OverflowStrategy.backpressure()).async()
.to(Sink.foreach(str -> System.out.println(str)))
.run(materializer);
for (int i=1; i<100000; i++) {
componentA_outPort1.offer("Akka rocks: " + i).toCompletableFuture().join();
}
仍然"Done"不一定会在最后打印出来,因为offer的完成只保证你队列接受了这个元素,而不是它被完全处理了。另外,请记住 actorSystem.terminate()
也是异步的。
上述方法适用于您的情况,但在某些情况下可能不希望阻塞当前线程。对于像您这样的简单案例,可以通过使用不同的 Source
:
Source.range(1, 1000).map(i -> "Akka rocks: " + i)
对于更复杂的情况,请考虑 Source 的其他静态方法,例如采用 Iterable 的 Source.from
,或 Source.fromIterator
.
几件事:
- 喂
SourceQueue
的惯用方法是喂另一个Source
(正如佩德罗在他的回答中提到的)。 - 您可能会在流处理完成之前终止 actor 系统。
接收器的物化值完成后关闭系统:
import java.util.concurrent.CompletionStage;
import akka.Done;
import akka.japi.Pair;
import akka.stream.javadsl.Keep;
import akka.stream.javadsl.RunnableGraph;
// other imports...
Sink<String, CompletionStage<Done>> sink =
Sink.foreach(str -> System.out.println(str));
Source<String, SourceQueueWithComplete<String>> outPort =
Source.<String>queue(100, OverflowStrategy.backpressure()).async();
RunnableGraph<Pair<SourceQueueWithComplete<String>, CompletionStage<Done>>> stream =
outPort.toMat(sink, Keep.both());
Pair<SourceQueueWithComplete<String>, CompletionStage<Done>> pair = stream.run();
Source.range(1, 100000)
.map(i -> "Akka rocks: " + i)
.mapAsync(1, s -> pair.first().offer(s))
.runWith(Sink.ignore(), materializer);
pair.second().thenRun(() -> actorSystem.terminate());