Akka StreamRefs - IllegalStateException(在处于 UpstreamTerminated 状态时看到 RemoteStreamCompleted)
Akka StreamRefs - IllegalStateException (Saw RemoteStreamCompleted while in state UpstreamTerminated)
我正在尝试使用 akka 流引用(akka-streams 库版本:2.6.3)将音频流从服务 A 发送到服务 B。一切都工作得很好,除了每月一次异常(每天使用此服务大约每天 50k 次左右)在 akka 流引用中抛出,我找不到原因问题。
错误堆栈跟踪如下:
Caused by: java.lang.IllegalStateException: [SourceRef-46] Saw RemoteStreamCompleted(37) while in state UpstreamTerminated(Actor[akka://system-name@serviceA:34363/system/Materializers/StreamSupervisor-3/$$S4-SinkRef-3405#-939568637]), should never happen
at akka.stream.impl.streamref.SourceRefStageImpl$$anon.$anonfun$receiveRemoteMessage(SourceRefImpl.scala:285)
at akka.stream.impl.streamref.SourceRefStageImpl$$anon.$anonfun$receiveRemoteMessage$adapted(SourceRefImpl.scala:196)
at akka.stream.stage.GraphStageLogic$StageActor.internalReceive(GraphStage.scala:243)
at akka.stream.stage.GraphStageLogic$StageActor.$anonfun$callback(GraphStage.scala:202)
at akka.stream.stage.GraphStageLogic$StageActor.$anonfun$callback$adapted(GraphStage.scala:202)
at akka.stream.impl.fusing.GraphInterpreter.runAsyncInput(GraphInterpreter.scala:466)
at akka.stream.impl.fusing.GraphInterpreterShell$AsyncInput.execute(ActorGraphInterpreter.scala:497)
at akka.stream.impl.fusing.GraphInterpreterShell.processEvent(ActorGraphInterpreter.scala:599)
at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:768)
at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive.applyOrElse(ActorGraphInterpreter.scala:783)
at akka.actor.Actor.aroundReceive(Actor.scala:534)
at akka.actor.Actor.aroundReceive$(Actor.scala:532)
at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:690)
... 11 common frames omitted
服务A中负责通过SourceRef推送音频的代码:
Materializer materializer = Materializer.createMaterializer(actorSystem);
AudioExtractor extractor = new AudioExtractorImpl("/path/to/audio/file"); // gets all audio bytes from audio file and puts them into chunks (byte arrays of certain length)
List<AudioChunk> audioChunkList = extractor.getChunkedBytesIntoList();
SourceRef<AudioChunk> sourceRef = Source.from(audioChunkList)
.runWith(StreamRefs.sourceRef(), materializer);
// wrap the sourceRef into msg
serviceBActor.tell(wrappedAudioSourceRefInMsg, getSelf());
而服务 B 中负责接受音频的代码:
private final List<AudioChunk> audioChunksBuffer = new ArrayList<>();
private final Materializer materializer;
public Receive createReceive() {
return receiveBuilder.match(WrappedAudioSourceRefInMsg.class, response -> {
response.getSourceRef()
.getSource()
.runWith(Sink.forEach(chunk -> audioChunksBuffer.add(chunk)), materializer);
}).build();
}
我已经确认的是,这个错误总是在服务 A 发送完所有音频并且流完成后发生。我无法弄清楚为什么 SourceRef 在 UpstreamTerminated
状态下接收 RemoteStreamCompleted
。尤其令人沮丧的是消息中 should never happen
的部分。 :|
非常欢迎任何帮助。
关闭,此处报告 akka 中的错误:https://github.com/akka/akka/issues/28852
我正在尝试使用 akka 流引用(akka-streams 库版本:2.6.3)将音频流从服务 A 发送到服务 B。一切都工作得很好,除了每月一次异常(每天使用此服务大约每天 50k 次左右)在 akka 流引用中抛出,我找不到原因问题。
错误堆栈跟踪如下:
Caused by: java.lang.IllegalStateException: [SourceRef-46] Saw RemoteStreamCompleted(37) while in state UpstreamTerminated(Actor[akka://system-name@serviceA:34363/system/Materializers/StreamSupervisor-3/$$S4-SinkRef-3405#-939568637]), should never happen
at akka.stream.impl.streamref.SourceRefStageImpl$$anon.$anonfun$receiveRemoteMessage(SourceRefImpl.scala:285)
at akka.stream.impl.streamref.SourceRefStageImpl$$anon.$anonfun$receiveRemoteMessage$adapted(SourceRefImpl.scala:196)
at akka.stream.stage.GraphStageLogic$StageActor.internalReceive(GraphStage.scala:243)
at akka.stream.stage.GraphStageLogic$StageActor.$anonfun$callback(GraphStage.scala:202)
at akka.stream.stage.GraphStageLogic$StageActor.$anonfun$callback$adapted(GraphStage.scala:202)
at akka.stream.impl.fusing.GraphInterpreter.runAsyncInput(GraphInterpreter.scala:466)
at akka.stream.impl.fusing.GraphInterpreterShell$AsyncInput.execute(ActorGraphInterpreter.scala:497)
at akka.stream.impl.fusing.GraphInterpreterShell.processEvent(ActorGraphInterpreter.scala:599)
at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:768)
at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive.applyOrElse(ActorGraphInterpreter.scala:783)
at akka.actor.Actor.aroundReceive(Actor.scala:534)
at akka.actor.Actor.aroundReceive$(Actor.scala:532)
at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:690)
... 11 common frames omitted
服务A中负责通过SourceRef推送音频的代码:
Materializer materializer = Materializer.createMaterializer(actorSystem);
AudioExtractor extractor = new AudioExtractorImpl("/path/to/audio/file"); // gets all audio bytes from audio file and puts them into chunks (byte arrays of certain length)
List<AudioChunk> audioChunkList = extractor.getChunkedBytesIntoList();
SourceRef<AudioChunk> sourceRef = Source.from(audioChunkList)
.runWith(StreamRefs.sourceRef(), materializer);
// wrap the sourceRef into msg
serviceBActor.tell(wrappedAudioSourceRefInMsg, getSelf());
而服务 B 中负责接受音频的代码:
private final List<AudioChunk> audioChunksBuffer = new ArrayList<>();
private final Materializer materializer;
public Receive createReceive() {
return receiveBuilder.match(WrappedAudioSourceRefInMsg.class, response -> {
response.getSourceRef()
.getSource()
.runWith(Sink.forEach(chunk -> audioChunksBuffer.add(chunk)), materializer);
}).build();
}
我已经确认的是,这个错误总是在服务 A 发送完所有音频并且流完成后发生。我无法弄清楚为什么 SourceRef 在 UpstreamTerminated
状态下接收 RemoteStreamCompleted
。尤其令人沮丧的是消息中 should never happen
的部分。 :|
非常欢迎任何帮助。
关闭,此处报告 akka 中的错误:https://github.com/akka/akka/issues/28852