Kafka 主题到 websocket
Kafka topic to websocket
我正在尝试实施一个设置,其中我有多个网络浏览器打开到我的 akka-http 服务器的 websocket 连接,以便阅读发布到 kafka 主题的所有消息。
所以消息流应该这样走
kafka topic -> akka-http -> websocket connection 1
-> websocket connection 2
-> websocket connection 3
现在我已经为 websocket 创建了一个路径:
val route: Route =
path("ws") {
handleWebSocketMessages(notificationWs)
}
然后我为我的 kafka 主题创建了一个消费者:
val consumerSettings = ConsumerSettings(system,
new ByteArrayDeserializer, new StringDeserializer)
.withBootstrapServers("localhost:9092")
.withGroupId("group1")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
val source = Consumer
.plainSource(consumerSettings, Subscriptions.topics("topic1"))
最后我想将这个源连接到 handleWebSocketMessages 中的 websocket
def handleWebSocketMessages: Flow[Message, Message, Any] =
Flow[Message].mapConcat {
case tm: TextMessage =>
TextMessage(source)::Nil
case bm: BinaryMessage =>
// ignore binary messages but drain content to avoid the stream being clogged
bm.dataStream.runWith(Sink.ignore)
Nil
}
这是我在 TextMessage 中尝试使用 source
时遇到的错误:
Error:(77, 9) overloaded method value apply with alternatives:
(textStream: akka.stream.scaladsl.Source[String,Any])akka.http.scaladsl.model.ws.TextMessage
(text: String)akka.http.scaladsl.model.ws.TextMessage.Strict
cannot be applied to (akka.stream.scaladsl.Source[org.apache.kafka.clients.consumer.ConsumerRecord[Array[Byte],String],akka.kafka.scaladsl.Consumer.Control])
TextMessage(source)::Nil
我想我一路上犯了很多错误,但我想说最阻碍的部分是 handleWebSocketMessages
。
首先要了解源的类型:Source[ConsumerRecord[K, V], Control]
。
因此,您不能将其作为 TextMessage 的参数传递。
现在,让我们从websocket的角度来看:
- 为 Kafka 源中的每条消息构建一个传出消息。该消息将是来自 Kafka 消息的字符串转换的 TextMessage。
- 对于每条传入的消息,只需 println() 即可
因此,Flow
可以看作两个组成部分:Source
和 Sink
。
val incomingMessages: Sink[Message, NotUsed] =
Sink.foreach(println(_))
val outgoingMessages: Source[Message, NotUsed] =
source
.map { consumerRecord => TextMessage(consumerRecord.record.value) }
val handleWebSocketMessages: Flow[Message, Message, Any]
= Flow.fromSinkAndSource(incomingMessages, outgoingMessages)
希望对您有所帮助。
我正在尝试实施一个设置,其中我有多个网络浏览器打开到我的 akka-http 服务器的 websocket 连接,以便阅读发布到 kafka 主题的所有消息。
所以消息流应该这样走
kafka topic -> akka-http -> websocket connection 1
-> websocket connection 2
-> websocket connection 3
现在我已经为 websocket 创建了一个路径:
val route: Route =
path("ws") {
handleWebSocketMessages(notificationWs)
}
然后我为我的 kafka 主题创建了一个消费者:
val consumerSettings = ConsumerSettings(system,
new ByteArrayDeserializer, new StringDeserializer)
.withBootstrapServers("localhost:9092")
.withGroupId("group1")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
val source = Consumer
.plainSource(consumerSettings, Subscriptions.topics("topic1"))
最后我想将这个源连接到 handleWebSocketMessages 中的 websocket
def handleWebSocketMessages: Flow[Message, Message, Any] =
Flow[Message].mapConcat {
case tm: TextMessage =>
TextMessage(source)::Nil
case bm: BinaryMessage =>
// ignore binary messages but drain content to avoid the stream being clogged
bm.dataStream.runWith(Sink.ignore)
Nil
}
这是我在 TextMessage 中尝试使用 source
时遇到的错误:
Error:(77, 9) overloaded method value apply with alternatives: (textStream: akka.stream.scaladsl.Source[String,Any])akka.http.scaladsl.model.ws.TextMessage (text: String)akka.http.scaladsl.model.ws.TextMessage.Strict cannot be applied to (akka.stream.scaladsl.Source[org.apache.kafka.clients.consumer.ConsumerRecord[Array[Byte],String],akka.kafka.scaladsl.Consumer.Control]) TextMessage(source)::Nil
我想我一路上犯了很多错误,但我想说最阻碍的部分是 handleWebSocketMessages
。
首先要了解源的类型:Source[ConsumerRecord[K, V], Control]
。
因此,您不能将其作为 TextMessage 的参数传递。
现在,让我们从websocket的角度来看:
- 为 Kafka 源中的每条消息构建一个传出消息。该消息将是来自 Kafka 消息的字符串转换的 TextMessage。
- 对于每条传入的消息,只需 println() 即可
因此,Flow
可以看作两个组成部分:Source
和 Sink
。
val incomingMessages: Sink[Message, NotUsed] =
Sink.foreach(println(_))
val outgoingMessages: Source[Message, NotUsed] =
source
.map { consumerRecord => TextMessage(consumerRecord.record.value) }
val handleWebSocketMessages: Flow[Message, Message, Any]
= Flow.fromSinkAndSource(incomingMessages, outgoingMessages)
希望对您有所帮助。