Akka 等待流完成
Akka wait for stream completion
我想知道是否可以等待以下流完成。对于某些人来说这可能是显而易见的,但我是 akka 的新手。
final Materializer mat = ActorMaterializer.create(getContext());
String path = "Databases\" + r.book.db + "\" + r.book.title + ".txt";
Stream<String> fileStream = Files.lines(Paths.get(path));
Source<StreamResult, NotUsed> lines = Source.from(fileStream
.map(e -> new StreamResult(r.replyTo, e))
.collect(Collectors.toList()));
lines.throttle(1, Duration.ofSeconds(1))
.runWith(Sink.actorRef(sender, new StreamEnd()), mat);
这里你需要注意两件事,就是你使用 Sink
(即流定义的结尾)和 materialised values
Sink.actorRef
returns Sink<In,NotUsed>
, so you can't just figure out stream termination information from what you have (because materialised value is NotUsed
in your case). Moreover you can't use other sink which provides this info (eg, Sink.ignore
, whose materialised value gives you Future
about when stream finishes), because in general stream has one sink (of course, you can use Flow.alsoToMat
,但有更好的方法)
您可以在最后一个 Sink
之前的流程上使用 Flow.watchTermination
,这将告诉您您的上游状态。可能它最有意义,因为您使用 "send and forget".
形式的 Sink.actorRef
我不知道您的完整用例,但以防万一您会发现 Flow.ask
有用。它也向 actor 发送消息,但也等待 actor 响应。
我想知道是否可以等待以下流完成。对于某些人来说这可能是显而易见的,但我是 akka 的新手。
final Materializer mat = ActorMaterializer.create(getContext());
String path = "Databases\" + r.book.db + "\" + r.book.title + ".txt";
Stream<String> fileStream = Files.lines(Paths.get(path));
Source<StreamResult, NotUsed> lines = Source.from(fileStream
.map(e -> new StreamResult(r.replyTo, e))
.collect(Collectors.toList()));
lines.throttle(1, Duration.ofSeconds(1))
.runWith(Sink.actorRef(sender, new StreamEnd()), mat);
这里你需要注意两件事,就是你使用 Sink
(即流定义的结尾)和 materialised values
Sink.actorRef
returns Sink<In,NotUsed>
, so you can't just figure out stream termination information from what you have (because materialised value is NotUsed
in your case). Moreover you can't use other sink which provides this info (eg, Sink.ignore
, whose materialised value gives you Future
about when stream finishes), because in general stream has one sink (of course, you can use Flow.alsoToMat
,但有更好的方法)
您可以在最后一个 Sink
之前的流程上使用 Flow.watchTermination
,这将告诉您您的上游状态。可能它最有意义,因为您使用 "send and forget".
Sink.actorRef
我不知道您的完整用例,但以防万一您会发现 Flow.ask
有用。它也向 actor 发送消息,但也等待 actor 响应。