RestartSource 屏蔽了包装源的物化值?
RestartSource masking the materialized value for the wrapped source?
我正在通过围绕各种功能添加一些重试逻辑来修改现有的流图。其中一件是源代码,在本例中恰好是来自 alpakka kafka 连接器的 kafka Consumer.committableSource
。在下游,该图需要 Source[ConsumerMessage.CommittableMessage[String, AnyRef], Control]
类型,但是当我将可提交源包装在 RestartSource
中时,我最终得到 Source[ConsumerMessage.CommittableMessage[String, AnyRef], NotUsed]
我尝试在末尾添加 (Keep.both)
,但最终出现编译时错误。下面两个例子供参考:
val restartSource: Source[ConsumerMessage.CommittableMessage[String, AnyRef], NotUsed] = RestartSource.onFailuresWithBackoff(
minBackoff = 3.seconds,
maxBackoff = 60.seconds,
randomFactor = .2
) {() => Consumer.committableSource(consumerSettings, subscription)}
val s: Source[ConsumerMessage.CommittableMessage[String, AnyRef], Control] = Consumer.committableSource(consumerSettings, subscription)
正如您所观察到的,以及在当前公开的 ticket, the materialized value of the original Source
is not exposed in the return value of the wrapping RestartSource
. To get around this, try using mapMaterializedValue
中所讨论的(免责声明:我没有测试以下内容):
val restartSource: Source[ConsumerMessage.CommittableMessage[String, AnyRef], Control] = {
var control: Option[Control] = None
RestartSource.onFailuresWithBackoff(
minBackoff = 3.seconds,
maxBackoff = 60.seconds,
randomFactor = .2
) { () =>
Consumer
.committableSource(consumerSettings, subscription)
.mapMaterializedValue { c =>
control = Some(c)
}
}
.mapMaterializedValue(_ => control)
.collect { case Some(c) => c }
}
您可以 preMaterialize
Source
将产生 Control
,如下所示:
Pair<Consumer.Control, Source<ConsumerMessage.CommittableOffset, NotUsed>> controlSourcePair =
origSrc.preMaterialize(materializer);
Source<ConsumerMessage.CommittableOffset, NotUsed> source =
RestartSource.withBackoff(
Duration.ofSeconds(1),
Duration.ofSeconds(10),
0.2,
20,
controlSourcePair::second);
source
.toMat(Committer.sink(CommitterSettings.create(system)
.withMaxBatch(1)), Keep.both())
.mapMaterializedValue(pair ->
Consumer.createDrainingControl(
new Pair<>(controlSourcePair.first(), pair.second())))
.run(materializer);
很抱歉没有为您提供等效的 Scala。
我正在通过围绕各种功能添加一些重试逻辑来修改现有的流图。其中一件是源代码,在本例中恰好是来自 alpakka kafka 连接器的 kafka Consumer.committableSource
。在下游,该图需要 Source[ConsumerMessage.CommittableMessage[String, AnyRef], Control]
类型,但是当我将可提交源包装在 RestartSource
中时,我最终得到 Source[ConsumerMessage.CommittableMessage[String, AnyRef], NotUsed]
我尝试在末尾添加 (Keep.both)
,但最终出现编译时错误。下面两个例子供参考:
val restartSource: Source[ConsumerMessage.CommittableMessage[String, AnyRef], NotUsed] = RestartSource.onFailuresWithBackoff(
minBackoff = 3.seconds,
maxBackoff = 60.seconds,
randomFactor = .2
) {() => Consumer.committableSource(consumerSettings, subscription)}
val s: Source[ConsumerMessage.CommittableMessage[String, AnyRef], Control] = Consumer.committableSource(consumerSettings, subscription)
正如您所观察到的,以及在当前公开的 ticket, the materialized value of the original Source
is not exposed in the return value of the wrapping RestartSource
. To get around this, try using mapMaterializedValue
中所讨论的(免责声明:我没有测试以下内容):
val restartSource: Source[ConsumerMessage.CommittableMessage[String, AnyRef], Control] = {
var control: Option[Control] = None
RestartSource.onFailuresWithBackoff(
minBackoff = 3.seconds,
maxBackoff = 60.seconds,
randomFactor = .2
) { () =>
Consumer
.committableSource(consumerSettings, subscription)
.mapMaterializedValue { c =>
control = Some(c)
}
}
.mapMaterializedValue(_ => control)
.collect { case Some(c) => c }
}
您可以 preMaterialize
Source
将产生 Control
,如下所示:
Pair<Consumer.Control, Source<ConsumerMessage.CommittableOffset, NotUsed>> controlSourcePair =
origSrc.preMaterialize(materializer);
Source<ConsumerMessage.CommittableOffset, NotUsed> source =
RestartSource.withBackoff(
Duration.ofSeconds(1),
Duration.ofSeconds(10),
0.2,
20,
controlSourcePair::second);
source
.toMat(Committer.sink(CommitterSettings.create(system)
.withMaxBatch(1)), Keep.both())
.mapMaterializedValue(pair ->
Consumer.createDrainingControl(
new Pair<>(controlSourcePair.first(), pair.second())))
.run(materializer);
很抱歉没有为您提供等效的 Scala。