Kafka 主题到 websocket

Kafka topic to websocket

我正在尝试实施一个设置,其中我有多个网络浏览器打开到我的 akka-http 服务器的 websocket 连接,以便阅读发布到 kafka 主题的所有消息。

所以消息流应该这样走

kafka topic -> akka-http -> websocket connection 1 
                         -> websocket connection 2
                         -> websocket connection 3 

现在我已经为 websocket 创建了一个路径:

val route: Route = 
 path("ws") {
   handleWebSocketMessages(notificationWs)
 }

然后我为我的 kafka 主题创建了一个消费者:

val consumerSettings = ConsumerSettings(system,
  new ByteArrayDeserializer, new StringDeserializer)
    .withBootstrapServers("localhost:9092")
    .withGroupId("group1")
    .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
val source = Consumer
  .plainSource(consumerSettings, Subscriptions.topics("topic1"))

最后我想将这个源连接到 handleWebSocketMessages 中的 websocket

def handleWebSocketMessages: Flow[Message, Message, Any] =
  Flow[Message].mapConcat {
    case tm: TextMessage =>
      TextMessage(source)::Nil
    case bm: BinaryMessage =>
      // ignore binary messages but drain content to avoid the stream being clogged
      bm.dataStream.runWith(Sink.ignore)
      Nil
  }

这是我在 TextMessage 中尝试使用 source 时遇到的错误:

Error:(77, 9) overloaded method value apply with alternatives: (textStream: akka.stream.scaladsl.Source[String,Any])akka.http.scaladsl.model.ws.TextMessage (text: String)akka.http.scaladsl.model.ws.TextMessage.Strict cannot be applied to (akka.stream.scaladsl.Source[org.apache.kafka.clients.consumer.ConsumerRecord[Array[Byte],String],akka.kafka.scaladsl.Consumer.Control]) TextMessage(source)::Nil

我想我一路上犯了很多错误,但我想说最阻碍的部分是 handleWebSocketMessages

首先要了解源的类型:Source[ConsumerRecord[K, V], Control]。 因此,您不能将其作为 TextMessage 的参数传递。

现在,让我们从websocket的角度来看:

  • 为 Kafka 源中的每条消息构建一个传出消息。该消息将是来自 Kafka 消息的字符串转换的 TextMessage。
  • 对于每条传入的消息,只需 println() 即可

因此,Flow 可以看作两个组成部分:SourceSink

val incomingMessages: Sink[Message, NotUsed] =
  Sink.foreach(println(_))

val outgoingMessages: Source[Message, NotUsed] =
  source
    .map { consumerRecord => TextMessage(consumerRecord.record.value) }

val handleWebSocketMessages: Flow[Message, Message, Any]  
  = Flow.fromSinkAndSource(incomingMessages, outgoingMessages)

希望对您有所帮助。