Scala RestartSink 未来
Scala RestartSink Future
我正在尝试重新创建 Scala 的 [RestartSink][1]
特性的类似功能。
我想出了这段代码。但是,由于我们只 return a SinkShape
而不是 Sink
,我在指定它应该 return a Future[Done]
而不是 [=16] 时遇到了麻烦=] 因为它是物化类型。但是,我对如何做到这一点感到困惑。我只能得到它 return [MessageActionPair, NotUsed]
而不是想要的 [MessageActionPair, Future[Done]]
。我仍在学习围绕这个框架的方法,所以我确信我遗漏了一些小东西。我尝试调用 Source.toMat(RestartWithBackoffSink...)
,但是也没有得到预期的结果。
private final class RestartWithBackoffSink(
sourcePool: Seq[SqsEndpoint],
minBackoff: FiniteDuration,
maxBackoff: FiniteDuration,
randomFactor: Double) extends GraphStage[SinkShape[MessageActionPair]] { self ⇒
val in = Inlet[MessageActionPair]("RoundRobinRestartWithBackoffSink.in")
override def shape = SinkShape(in)
override def createLogic(inheritedAttributes: Attributes) = new RestartWithBackoffLogic(
"Sink", shape, minBackoff, maxBackoff, randomFactor, onlyOnFailures = false) {
override protected def logSource = self.getClass
override protected def startGraph() = {
val sourceOut = createSubOutlet(in)
Source.fromGraph(sourceOut.source).runWith(createSink(getEndpoint))(subFusingMaterializer)
}
override protected def backoff() = {
setHandler(in, new InHandler {
override def onPush() = ()
})
}
private def createSink(endpoint: SqsEndpoint): Sink[MessageActionPair, Future[Done]] = {
SqsAckSink(endpoint.queue.url)(endpoint.client)
}
def getEndpoint: SqsEndpoint = {
if(isTimedOut) {
index = (index + 1) % sourcePool.length
restartCount = 0
}
sourcePool(index)
}
backoff()
}
}
这里有语法错误,因为类型不匹配:
def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, sourcePool: Seq[SqsEndpoint]): Sink[MessageActionPair, Future[Done]] = {
Sink.fromGraph(new RestartWithBackoffSink(sourcePool, minBackoff, maxBackoff, randomFactor))
}
通过扩展 extends GraphStage[SinkShape[MessageActionPair]]
,您正在定义一个没有具体化价值的阶段。或者更好的是,您定义一个具体化为 NotUsed
.
的阶段
你必须决定你的舞台是否可以实现任何有意义的东西。有关阶段 here.
的具体化值的更多信息
如果是:您必须扩展GraphStageWithMaterializedValue[SinkShape[MessageActionPair], Future[Done]]
并正确覆盖createLogicAndMaterializedValue
函数。可以在 docs.
中找到更多指导
如果没有:您可以按照以下方式更改您的类型
def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, sourcePool: Seq[SqsEndpoint]): Sink[MessageActionPair, NotUsed] = {
Sink.fromGraph(new RestartWithBackoffSink(sourcePool, minBackoff, maxBackoff, randomFactor))
}
我正在尝试重新创建 Scala 的 [RestartSink][1]
特性的类似功能。
我想出了这段代码。但是,由于我们只 return a SinkShape
而不是 Sink
,我在指定它应该 return a Future[Done]
而不是 [=16] 时遇到了麻烦=] 因为它是物化类型。但是,我对如何做到这一点感到困惑。我只能得到它 return [MessageActionPair, NotUsed]
而不是想要的 [MessageActionPair, Future[Done]]
。我仍在学习围绕这个框架的方法,所以我确信我遗漏了一些小东西。我尝试调用 Source.toMat(RestartWithBackoffSink...)
,但是也没有得到预期的结果。
private final class RestartWithBackoffSink(
sourcePool: Seq[SqsEndpoint],
minBackoff: FiniteDuration,
maxBackoff: FiniteDuration,
randomFactor: Double) extends GraphStage[SinkShape[MessageActionPair]] { self ⇒
val in = Inlet[MessageActionPair]("RoundRobinRestartWithBackoffSink.in")
override def shape = SinkShape(in)
override def createLogic(inheritedAttributes: Attributes) = new RestartWithBackoffLogic(
"Sink", shape, minBackoff, maxBackoff, randomFactor, onlyOnFailures = false) {
override protected def logSource = self.getClass
override protected def startGraph() = {
val sourceOut = createSubOutlet(in)
Source.fromGraph(sourceOut.source).runWith(createSink(getEndpoint))(subFusingMaterializer)
}
override protected def backoff() = {
setHandler(in, new InHandler {
override def onPush() = ()
})
}
private def createSink(endpoint: SqsEndpoint): Sink[MessageActionPair, Future[Done]] = {
SqsAckSink(endpoint.queue.url)(endpoint.client)
}
def getEndpoint: SqsEndpoint = {
if(isTimedOut) {
index = (index + 1) % sourcePool.length
restartCount = 0
}
sourcePool(index)
}
backoff()
}
}
这里有语法错误,因为类型不匹配:
def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, sourcePool: Seq[SqsEndpoint]): Sink[MessageActionPair, Future[Done]] = {
Sink.fromGraph(new RestartWithBackoffSink(sourcePool, minBackoff, maxBackoff, randomFactor))
}
通过扩展 extends GraphStage[SinkShape[MessageActionPair]]
,您正在定义一个没有具体化价值的阶段。或者更好的是,您定义一个具体化为 NotUsed
.
你必须决定你的舞台是否可以实现任何有意义的东西。有关阶段 here.
的具体化值的更多信息如果是:您必须扩展GraphStageWithMaterializedValue[SinkShape[MessageActionPair], Future[Done]]
并正确覆盖createLogicAndMaterializedValue
函数。可以在 docs.
如果没有:您可以按照以下方式更改您的类型
def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, sourcePool: Seq[SqsEndpoint]): Sink[MessageActionPair, NotUsed] = {
Sink.fromGraph(new RestartWithBackoffSink(sourcePool, minBackoff, maxBackoff, randomFactor))
}