Akka:有没有永不拉动的水槽?
Akka: is there a Sink that never pulls?
需要一个永不拉动的 Sink
,以便在单元测试中使用。
是否已经有可用的或者我需要自己编写代码?
请注意,Sink.ignore()
无济于事,因为它总是拉。
我需要一个永远不会拉动的水槽。
直接回答
您可以创建一个从不调用 Subscription.request
的 org.reactivestreams.Subscriber
:
import org.reactivestreams.Subscriber
def nonSubscriber[T] : Subscriber[T] = new Subscriber[T] {
override def onComplete() : Unit = {}
override def onError(throwable: java.lang.Throwable) : Unit = {}
//should never be called therefore definition is not implemented
override def onNext(t: T) : Unit = ???
//does not call s.request
override def onSubscribe(s: Subscription) : Unit = {}
}
此订阅者随后可用于实例化 Sink
:
import akka.NotUsed
import akka.stream.scaladsl.Sink
def nonSubscribingSink[T] : Sink[T, NotUsed] =
Sink.fromSubscriber[T](nonSubscriber[T])
间接回答
问题的性质表明您将 "business logic" 与 akka 流逻辑混合在一起。您可能需要考虑 a re-design,这可能会使您的问题的答案变得不必要。
最终创建了我自己的实现:
// sink that does not pull
val snkStage = object : GraphStage<SinkShape<Message>>() {
val shape = SinkShape(Inlet.create<Message>("in"))
override fun shape() = shape
override fun createLogic(inheritedAttributes: Attributes): GraphStageLogic = object : GraphStageLogic(shape) {
init {
setHandler(shape.`in`()) {}
}
}
}
但后来决定使用更传统的 Sink.ignore()
和 Source.maybe()
组合。
需要一个永不拉动的 Sink
,以便在单元测试中使用。
是否已经有可用的或者我需要自己编写代码?
请注意,Sink.ignore()
无济于事,因为它总是拉。
我需要一个永远不会拉动的水槽。
直接回答
您可以创建一个从不调用 Subscription.request
的 org.reactivestreams.Subscriber
:
import org.reactivestreams.Subscriber
def nonSubscriber[T] : Subscriber[T] = new Subscriber[T] {
override def onComplete() : Unit = {}
override def onError(throwable: java.lang.Throwable) : Unit = {}
//should never be called therefore definition is not implemented
override def onNext(t: T) : Unit = ???
//does not call s.request
override def onSubscribe(s: Subscription) : Unit = {}
}
此订阅者随后可用于实例化 Sink
:
import akka.NotUsed
import akka.stream.scaladsl.Sink
def nonSubscribingSink[T] : Sink[T, NotUsed] =
Sink.fromSubscriber[T](nonSubscriber[T])
间接回答
问题的性质表明您将 "business logic" 与 akka 流逻辑混合在一起。您可能需要考虑 a re-design,这可能会使您的问题的答案变得不必要。
最终创建了我自己的实现:
// sink that does not pull
val snkStage = object : GraphStage<SinkShape<Message>>() {
val shape = SinkShape(Inlet.create<Message>("in"))
override fun shape() = shape
override fun createLogic(inheritedAttributes: Attributes): GraphStageLogic = object : GraphStageLogic(shape) {
init {
setHandler(shape.`in`()) {}
}
}
}
但后来决定使用更传统的 Sink.ignore()
和 Source.maybe()
组合。