将生产者流连接到图形
connect producer flow to graph
我刚开始使用 akka streams kafka(和一般的 akka streams)。我正在尝试构建一个图表以便将消息发布到不同的主题。
如何将生产者连接为流以提交已处理的消息?我尝试使用 Producer.flow 但我无法获得 commitScaladsl
object TestFoo {
import akka.kafka.ProducerMessage.Message
implicit val system = ActorSystem("test-kafka")
implicit val materializer = ActorMaterializer()
val evenNumbersTopic = "even_numbers"
val allNumbersTopic = "all_numbers"
lazy val consumerSettings = ConsumerSettings(system, new StringDeserializer(), new JsonDeserializer[Int])
.withBootstrapServers("localhost:9092")
.withGroupId("group1")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
lazy val source = Consumer.committableSource(consumerSettings, Subscriptions.topics(Set(evenNumbersTopic, allNumbersTopic)))
val producerSettings = ProducerSettings(system, new StringSerializer(), new StringSerializer())
.withBootstrapServers("localhost:9092")
val flow: RunnableGraph[NotUsed] = RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
import akka.stream.scaladsl.GraphDSL.Implicits._
type TypedMessage = Message[String, Int,CommittableOffset]
val bcast = b.add(Broadcast[TypedMessage](2))
val merge = b.add(Merge[TypedMessage](2))
val evenFilter = Flow[TypedMessage].filter ( c => c.record.value() % 2 == 0)
val justEven = Flow[TypedMessage].map{
case Message(pr, offset) =>
val r = new ProducerRecord[String, Int]("general", pr.value())
Message(r, offset)
}
val allNumbers = Flow[TypedMessage].map{
case Message(pr, offset) =>
val r = new ProducerRecord[String, Int](allNumbersTopic, pr.value())
Message(r, offset)
}
val toMsg = Flow[ConsumerMessage.CommittableMessage[String, Int]].map{ msg =>
val r = new ProducerRecord[String, Int]("general", msg.record.value())
Message(r, msg.committableOffset)
}
source ~> toMsg ~> bcast
bcast ~> evenFilter ~> justEven ~> merge
bcast ~> allNumbers ~> merge
merge ~> Producer.flow(producerSettings).mapAsync(producerSettings.parallelism) { result =>
result.message.passThrough.commitScaladsl() //this doesn't compile, cannot get the .commitScaladsl()
}
ClosedShape
})}
因为您使用的是 GraphDSL,所以编译器无法从前一阶段推断出 PassThrough
类型。
尝试将类型参数显式传递给 Producer.flow
函数,例如
merge ~> Producer.flow[K, V, CommittableOffset](producerSettings).mapAsync(producerSettings.parallelism) { result =>
result.message.passThrough.commitScaladsl()
}
我已将 K
和 V
作为未绑定参数,请将您的 Producer 必须生成的任何 key/value 类型放在那里。如果您希望正确连接上面的代码,您需要将 producerSettings
类型与来自合并阶段的类型相匹配。你需要这样的东西:
val producerSettings = ProducerSettings(system, new StringSerializer(), new JsonSerializer[Int])
.withBootstrapServers("localhost:9092")
我刚开始使用 akka streams kafka(和一般的 akka streams)。我正在尝试构建一个图表以便将消息发布到不同的主题。
如何将生产者连接为流以提交已处理的消息?我尝试使用 Producer.flow 但我无法获得 commitScaladsl
object TestFoo {
import akka.kafka.ProducerMessage.Message
implicit val system = ActorSystem("test-kafka")
implicit val materializer = ActorMaterializer()
val evenNumbersTopic = "even_numbers"
val allNumbersTopic = "all_numbers"
lazy val consumerSettings = ConsumerSettings(system, new StringDeserializer(), new JsonDeserializer[Int])
.withBootstrapServers("localhost:9092")
.withGroupId("group1")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
lazy val source = Consumer.committableSource(consumerSettings, Subscriptions.topics(Set(evenNumbersTopic, allNumbersTopic)))
val producerSettings = ProducerSettings(system, new StringSerializer(), new StringSerializer())
.withBootstrapServers("localhost:9092")
val flow: RunnableGraph[NotUsed] = RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
import akka.stream.scaladsl.GraphDSL.Implicits._
type TypedMessage = Message[String, Int,CommittableOffset]
val bcast = b.add(Broadcast[TypedMessage](2))
val merge = b.add(Merge[TypedMessage](2))
val evenFilter = Flow[TypedMessage].filter ( c => c.record.value() % 2 == 0)
val justEven = Flow[TypedMessage].map{
case Message(pr, offset) =>
val r = new ProducerRecord[String, Int]("general", pr.value())
Message(r, offset)
}
val allNumbers = Flow[TypedMessage].map{
case Message(pr, offset) =>
val r = new ProducerRecord[String, Int](allNumbersTopic, pr.value())
Message(r, offset)
}
val toMsg = Flow[ConsumerMessage.CommittableMessage[String, Int]].map{ msg =>
val r = new ProducerRecord[String, Int]("general", msg.record.value())
Message(r, msg.committableOffset)
}
source ~> toMsg ~> bcast
bcast ~> evenFilter ~> justEven ~> merge
bcast ~> allNumbers ~> merge
merge ~> Producer.flow(producerSettings).mapAsync(producerSettings.parallelism) { result =>
result.message.passThrough.commitScaladsl() //this doesn't compile, cannot get the .commitScaladsl()
}
ClosedShape
})}
因为您使用的是 GraphDSL,所以编译器无法从前一阶段推断出 PassThrough
类型。
尝试将类型参数显式传递给 Producer.flow
函数,例如
merge ~> Producer.flow[K, V, CommittableOffset](producerSettings).mapAsync(producerSettings.parallelism) { result =>
result.message.passThrough.commitScaladsl()
}
我已将 K
和 V
作为未绑定参数,请将您的 Producer 必须生成的任何 key/value 类型放在那里。如果您希望正确连接上面的代码,您需要将 producerSettings
类型与来自合并阶段的类型相匹配。你需要这样的东西:
val producerSettings = ProducerSettings(system, new StringSerializer(), new JsonSerializer[Int])
.withBootstrapServers("localhost:9092")