如何将 "Kafka Streams Binder" 与 "Functional Style" 和 DI 一起使用?

How to use "Kafka Streams Binder" with "Functional Style" and DI?

https://cloud.spring.io/spring-cloud-static/spring-cloud-stream-binder-kafka/3.0.0.M3/reference/html/spring-cloud-stream-binder-kafka.html#_programming_model 显示了一个示例,其中可以使用 属性 spring.cloud.stream.bindings.process_in.destination.

设置 输入主题

现在我想使用依赖注入,例如

@Bean
public java.util.function.Consumer<KStream<Object, String>> process(JavaMailSender mailSender) {...}

启动应用程序时(基于 Spring 启动)属性 spring.cloud.stream.bindings.process_in.destination 被忽略,而是订阅了输入主题 input

编辑:这是 Kotlin 代码(没有导入)

Mailer.kt:

@Configuration
class Mailer {
    @Bean
    fun sendMail(/*mailSender: JavaMailSender*/) = Consumer<KStream<Any, Mail>> { input ->
        input.foreach { _, mail -> println("mail = $mail") }
    }
}

Mail.kt:

data class Mail(var from: String = "", var to: String = "", var subject: String = "", var body: String = "")

Application.kt:

@SpringBootApplication
class Application

fun main(args: Array<String>) {
    runApplication<Application>(*args) {
    }
}

application.yml::

spring.cloud.stream:
  bindings.sendMail_in.destination: mail
  kafka.binder.configuration.listeners: PLAINTEXT://localhost:9092

活页夹中有一些问题没有正确地 autowire 提供给 function/consumer bean 的 bean。不过,最新快照解决了这些问题。请确保您使用的是最新的快照 (3.0.0.BUILD-SNAPSHOT)。这是一个 sample application,适用于您提供的相同场景。