一个 akka 流函数,它创建一个接收器和一个发出接收器接收到的任何东西的源

An akka streams function that creates a sink and a source that emits whatever the sink receives

目标是用这个签名实现一个函数

def bindedSinkAndSource[A]:(Sink[A, Any], Source[A, Any]) = ???

无论接收器接收到什么,返回的源都会发出。

我的主要目标是通过 handleWebSocketMessages 指令实现一个 websocket 转发器。
转发器图为:

leftReceiver ~> rightEmitter
leftEmitter <~ rightReceiver

其中 leftReceiverleftEmiter 是左端点处理程序流的进出; rightReceiverrightEmitter 是右端点处理程序流的进出。

例如:

import akka.NotUsed
import akka.http.scaladsl.model.ws.Message
import akka.http.scaladsl.server.Directive.addByNameNullaryApply
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.Route
import akka.stream.scaladsl.Flow
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source

def buildHandlers(): Route = {
    val (leftReceiver, rightEmitter) = bindedSinkAndSource[Message];
    val (rightReceiver, leftEmitter) = bindedSinkAndSource[Message];

    val leftHandlerFlow = Flow.fromSinkAndSource(leftReceiver, leftEmitter)
    val rightHandlerFlow = Flow.fromSinkAndSource(rightReceiver, rightEmitter)

    pathPrefix("leftEndpointChannel") {
        handleWebSocketMessages(leftHandlerFlow)
    } ~
        pathPrefix("rightEndpointChannel") {
            handleWebSocketMessages(rightHandlerFlow)
        }
}

我的所有想法都因 handleWebSocketMessages(..) 指令不允许访问接收流的物化值这一事实而受挫。

我找到了实现目标的方法,但可能有更短、更简单的方法。如果你知道,请不要犹豫,添加你的知识。

import org.reactivestreams.Publisher
import org.reactivestreams.Subscriber
import org.reactivestreams.Subscription

import akka.NotUsed
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source

def bindedSinkAndSource[A]: (Sink[A, NotUsed], Source[A, NotUsed]) = {

    class Binder extends Subscriber[A] with Publisher[A] { binder =>
        var oUpStreamSubscription: Option[Subscription] = None;
        var oDownStreamSubscriber: Option[Subscriber[_ >: A]] = None;
        var pendingRequestFromDownStream: Option[Long] = None;
        var pendingCancelFromDownStream: Boolean = false;

        def onSubscribe(upStreamSubscription: Subscription): Unit = {
            this.oUpStreamSubscription match {
                case Some(_) => upStreamSubscription.cancel // rule 2-5
                case None =>
                    this.oUpStreamSubscription = Some(upStreamSubscription);
                    if (pendingRequestFromDownStream.isDefined) {
                        upStreamSubscription.request(pendingRequestFromDownStream.get)
                        pendingRequestFromDownStream = None
                    }
                    if (pendingCancelFromDownStream) {
                        upStreamSubscription.cancel()
                        pendingCancelFromDownStream = false
                    }
            }
        }

        def onNext(a: A): Unit = {
            oDownStreamSubscriber.get.onNext(a)
        }

        def onComplete(): Unit = {
            oDownStreamSubscriber.foreach { _.onComplete() };
            this.oUpStreamSubscription = None
        }

        def onError(error: Throwable): Unit = {
            oDownStreamSubscriber.foreach { _.onError(error) };
            this.oUpStreamSubscription = None
        }

        def subscribe(downStreamSubscriber: Subscriber[_ >: A]): Unit = {
            assert(this.oDownStreamSubscriber.isEmpty);
            this.oDownStreamSubscriber = Some(downStreamSubscriber);

            downStreamSubscriber.onSubscribe(new Subscription() {
                def request(n: Long): Unit = {
                    binder.oUpStreamSubscription match {
                        case Some(usSub) => usSub.request(n);
                        case None =>
                            assert(binder.pendingRequestFromDownStream.isEmpty);
                            binder.pendingRequestFromDownStream = Some(n);
                    }
                };
                def cancel(): Unit = {
                    binder.oUpStreamSubscription match {
                        case Some(usSub) => usSub.cancel();
                        case None =>
                            assert(binder.pendingCancelFromDownStream == false);
                            binder.pendingCancelFromDownStream = true;
                    }
                    binder.oDownStreamSubscriber = None
                }
            })
        }
    }

    val binder = new Binder;
    val receiver = Sink.fromSubscriber(binder);
    val emitter = Source.fromPublisher(binder);
    (receiver, emitter);
}   

请注意,如果用户稍后未融合此方法创建的接收器和源,Binder class 的实例变量可能会遇到并发问题。如果不是这种情况,则对这些变量的所有访问都应包含在同步区域内。另一种解决方案是确保接收器和源在具有单个线程的执行上下文中具体化。

两天后我发现了 MergeHub 和 BroadcastHub。使用它们,答案要短得多:

import akka.stream.Materializer
def bindedSinkAndSource[T](implicit sm: Materializer): (Sink[T, NotUsed], Source[T, NotUsed]) = {
  import akka.stream.scaladsl.BroadcastHub;
  import akka.stream.scaladsl.MergeHub;
  import akka.stream.scaladsl.Keep;

  MergeHub.source[T](perProducerBufferSize = 8).toMat(BroadcastHub.sink[T](bufferSize = 256))(Keep.both) run
}

优点是返回的接收器和源可以多次实现。