RestartSource 屏蔽了包装源的物化值?

RestartSource masking the materialized value for the wrapped source?

我正在通过围绕各种功能添加一些重试逻辑来修改现有的流图。其中一件是源代码,在本例中恰好是来自 alpakka kafka 连接器的 kafka Consumer.committableSource。在下游,该图需要 Source[ConsumerMessage.CommittableMessage[String, AnyRef], Control] 类型,但是当我将可提交源包装在 RestartSource 中时,我最终得到 Source[ConsumerMessage.CommittableMessage[String, AnyRef], NotUsed]

我尝试在末尾添加 (Keep.both),但最终出现编译时错误。下面两个例子供参考:

val restartSource: Source[ConsumerMessage.CommittableMessage[String, AnyRef], NotUsed] = RestartSource.onFailuresWithBackoff(
  minBackoff = 3.seconds,
  maxBackoff = 60.seconds,
  randomFactor = .2
) {() => Consumer.committableSource(consumerSettings, subscription)}

val s: Source[ConsumerMessage.CommittableMessage[String, AnyRef], Control] = Consumer.committableSource(consumerSettings, subscription)

正如您所观察到的,以及在当前公开的 ticket, the materialized value of the original Source is not exposed in the return value of the wrapping RestartSource. To get around this, try using mapMaterializedValue 中所讨论的(免责声明:我没有测试以下内容):

val restartSource: Source[ConsumerMessage.CommittableMessage[String, AnyRef], Control] = {
  var control: Option[Control] = None

  RestartSource.onFailuresWithBackoff(
    minBackoff = 3.seconds,
    maxBackoff = 60.seconds,
    randomFactor = .2
  ) { () =>
    Consumer
      .committableSource(consumerSettings, subscription)
      .mapMaterializedValue { c =>
        control = Some(c)
      }
  }
  .mapMaterializedValue(_ => control)
  .collect { case Some(c) => c }
}

您可以 preMaterialize Source 将产生 Control,如下所示:

Pair<Consumer.Control, Source<ConsumerMessage.CommittableOffset, NotUsed>> controlSourcePair =
    origSrc.preMaterialize(materializer);

Source<ConsumerMessage.CommittableOffset, NotUsed> source =
    RestartSource.withBackoff(
        Duration.ofSeconds(1),
        Duration.ofSeconds(10),
        0.2,
        20,
        controlSourcePair::second);
source
    .toMat(Committer.sink(CommitterSettings.create(system)
        .withMaxBatch(1)), Keep.both())
    .mapMaterializedValue(pair -> 
        Consumer.createDrainingControl(
            new Pair<>(controlSourcePair.first(), pair.second())))
    .run(materializer);

很抱歉没有为您提供等效的 Scala。