Akka StreamRefs - IllegalStateException(在处于 UpstreamTerminated 状态时看到 RemoteStreamCompleted)

Akka StreamRefs - IllegalStateException (Saw RemoteStreamCompleted while in state UpstreamTerminated)

我正在尝试使用 akka 流引用(akka-streams 库版本:2.6.3)将音频流从服务 A 发送到服务 B。一切都工作得很好,除了每月一次异常(每天使用此服务大约每天 50k 次左右)在 akka 流引用中抛出,我找不到原因问题。

错误堆栈跟踪如下:

        Caused by: java.lang.IllegalStateException: [SourceRef-46] Saw RemoteStreamCompleted(37) while in state UpstreamTerminated(Actor[akka://system-name@serviceA:34363/system/Materializers/StreamSupervisor-3/$$S4-SinkRef-3405#-939568637]), should never happen
            at akka.stream.impl.streamref.SourceRefStageImpl$$anon.$anonfun$receiveRemoteMessage(SourceRefImpl.scala:285)
            at akka.stream.impl.streamref.SourceRefStageImpl$$anon.$anonfun$receiveRemoteMessage$adapted(SourceRefImpl.scala:196)
            at akka.stream.stage.GraphStageLogic$StageActor.internalReceive(GraphStage.scala:243)
            at akka.stream.stage.GraphStageLogic$StageActor.$anonfun$callback(GraphStage.scala:202)
            at akka.stream.stage.GraphStageLogic$StageActor.$anonfun$callback$adapted(GraphStage.scala:202)
            at akka.stream.impl.fusing.GraphInterpreter.runAsyncInput(GraphInterpreter.scala:466)
            at akka.stream.impl.fusing.GraphInterpreterShell$AsyncInput.execute(ActorGraphInterpreter.scala:497)
            at akka.stream.impl.fusing.GraphInterpreterShell.processEvent(ActorGraphInterpreter.scala:599)
            at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:768)
            at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive.applyOrElse(ActorGraphInterpreter.scala:783)
            at akka.actor.Actor.aroundReceive(Actor.scala:534)
            at akka.actor.Actor.aroundReceive$(Actor.scala:532)
            at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:690)
            ... 11 common frames omitted

服务A中负责通过SourceRef推送音频的代码:

Materializer materializer = Materializer.createMaterializer(actorSystem);
AudioExtractor extractor = new AudioExtractorImpl("/path/to/audio/file"); // gets all audio bytes from audio file and puts them into chunks (byte arrays of certain length)
List<AudioChunk> audioChunkList = extractor.getChunkedBytesIntoList();
SourceRef<AudioChunk> sourceRef = Source.from(audioChunkList)
      .runWith(StreamRefs.sourceRef(), materializer);
// wrap the sourceRef into msg
serviceBActor.tell(wrappedAudioSourceRefInMsg, getSelf());

而服务 B 中负责接受音频的代码:

private final List<AudioChunk> audioChunksBuffer = new ArrayList<>();
private final Materializer materializer;

public Receive createReceive() {
      return receiveBuilder.match(WrappedAudioSourceRefInMsg.class, response -> {
            response.getSourceRef()
                  .getSource()
                  .runWith(Sink.forEach(chunk -> audioChunksBuffer.add(chunk)), materializer); 
      }).build();
} 

我已经确认的是,这个错误总是在服务 A 发送完所有音频并且流完成后发生。我无法弄清楚为什么 SourceRef 在 UpstreamTerminated 状态下接收 RemoteStreamCompleted。尤其令人沮丧的是消息中 should never happen 的部分。 :|

非常欢迎任何帮助。

关闭,此处报告 akka 中的错误:https://github.com/akka/akka/issues/28852