Spring Cloud Stream Kafka Streams Binder KafkaException:无法启动流:'listener' 不能为空
Spring Cloud Stream Kafka Streams Binder KafkaException: Could not start stream: 'listener' cannot be null
我是 Kafka Streams 和 Spring Cloud Stream 的新手,但在将集成相关代码移动到属性文件方面读到了一些好东西,这样开发人员就可以主要关注事物的业务逻辑方面。
这里是我的简单应用程序class。
package com.some.events.consumer
import com.some.events.SomeEvent
import org.apache.kafka.streams.kstream.KStream
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication
import org.springframework.context.annotation.Bean
import java.util.function.Consumer
@SpringBootApplication
class ConsumerApplication {
@Bean
fun consume(): Consumer<KStream<String, SomeEvent>> {
return Consumer { input -> input.foreach { key, value -> println("Key: $key, value: $value") } }
}
}
fun main(args: Array<String>) {
runApplication<ConsumerApplication>(*args)
}
我的application.yml
文件如下
spring:
cloud:
function:
definition: consume
stream:
bindings:
consume-in-0:
destination: "some-event"
group: "some-event"
我在build.gradle.kts
中的依赖定义如下(这里只包含了相关的)
extra["springCloudVersion"] = "2020.0.2"
dependencies {
implementation("org.jetbrains.kotlin:kotlin-reflect")
implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
implementation("org.springframework.cloud:spring-cloud-stream")
implementation("org.springframework.cloud:spring-cloud-stream-binder-kafka-streams")
testImplementation("org.springframework.boot:spring-boot-starter-test")
}
dependencyManagement {
imports {
mavenBom("org.springframework.cloud:spring-cloud-dependencies:${property("springCloudVersion")}")
}
}
当我 运行 应用程序时,出现以下异常。
org.springframework.context.ApplicationContextException: Failed to start bean 'streamsBuilderFactoryManager'; nested exception is org.springframework.kafka.KafkaException: Could not start stream: ; nested exception is java.lang.IllegalArgumentException: 'listener' cannot be null
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:181) ~[spring-context-5.3.5.jar:5.3.5]
at org.springframework.context.support.DefaultLifecycleProcessor.access0(DefaultLifecycleProcessor.java:54) ~[spring-context-5.3.5.jar:5.3.5]
at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:356) ~[spring-context-5.3.5.jar:5.3.5]
at java.base/java.lang.Iterable.forEach(Iterable.java:75) ~[na:na]
at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:155) ~[spring-context-5.3.5.jar:5.3.5]
at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:123) ~[spring-context-5.3.5.jar:5.3.5]
at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:935) ~[spring-context-5.3.5.jar:5.3.5]
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:586) ~[spring-context-5.3.5.jar:5.3.5]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:769) ~[spring-boot-2.4.4.jar:2.4.4]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:761) ~[spring-boot-2.4.4.jar:2.4.4]
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:426) ~[spring-boot-2.4.4.jar:2.4.4]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:326) ~[spring-boot-2.4.4.jar:2.4.4]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1313) ~[spring-boot-2.4.4.jar:2.4.4]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1302) ~[spring-boot-2.4.4.jar:2.4.4]
at com.some.events.consumer.ConsumerApplicationKt.main(ConsumerApplication.kt:22) ~[main/:na]
Caused by: org.springframework.kafka.KafkaException: Could not start stream: ; nested exception is java.lang.IllegalArgumentException: 'listener' cannot be null
at org.springframework.cloud.stream.binder.kafka.streams.StreamsBuilderFactoryManager.start(StreamsBuilderFactoryManager.java:94) ~[spring-cloud-stream-binder-kafka-streams-3.1.2.jar:3.1.2]
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:178) ~[spring-context-5.3.5.jar:5.3.5]
... 14 common frames omitted
Caused by: java.lang.IllegalArgumentException: 'listener' cannot be null
at org.springframework.util.Assert.notNull(Assert.java:201) ~[spring-core-5.3.5.jar:5.3.5]
at org.springframework.kafka.config.StreamsBuilderFactoryBean.addListener(StreamsBuilderFactoryBean.java:268) ~[spring-kafka-2.6.7.jar:2.6.7]
at org.springframework.cloud.stream.binder.kafka.streams.StreamsBuilderFactoryManager.start(StreamsBuilderFactoryManager.java:84) ~[spring-cloud-stream-binder-kafka-streams-3.1.2.jar:3.1.2]
... 15 common frames omitted
Process finished with exit code 1
请注意,我知道我需要配置 Serde 和 Avro 相关的东西(我将 Avro 用于事件模式),但问题是,流甚至不会 运行。
有人能指出我正确的方向吗?我尝试用谷歌搜索这个,但没有人发布过由 'listener' cannot be null 引起的问题。谢谢!
destination: "some-event"
应该指向一个 kafka 主题。喜欢 destination: "some-event-topic"
.
然后你必须为监听器创建一个接口consume-in-0
。使用 spring 注释将使项目加载此侦听器,并且它将不再为空。
import org.apache.kafka.streams.kstream.KStream;
import org.springframework.cloud.stream.annotation.Input;
public interface KafkaListenerBinding {
@Input("consume-in-0")
KStream<String, String> inputStream();
}
然后创建一个@Service 来处理来自侦听器的消息@StreamListener("consume-in-0")
。
import lombok.extern.log4j.Log4j2;
import org.apache.kafka.streams.kstream.KStream;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.stereotype.Service;
@Log4j2
@Service
@EnableBinding(KafkaListenerBinding.class)
public class KafkaListenerService {
@StreamListener("consume-in-0")
public void process(KStream<String, String> input) {
input.foreach((k,v) -> log.info(String.format("Key: %s, Value: %s",k,v)));
}
}
注意: 尽管@Gary Russel 说了这个错误,但我将以实现 Spring 服务的功能方式来完成我的回答。
函数式风格可以通过在application.yml
文件中定义函数来实现。有一个内部约定,使用函数名称和后缀 in-0
和 out-0
进行绑定。定义绑定时必须使用它。更多详情 here.
spring:
cloud:
stream:
function:
definition: transformToUpperCase
bindings:
transformToUpperCase-in-0:
destination: input-func-topic
transformToUpperCase-out-0:
destination: output-func-topic
然后你用 @Configuration
和 @EnableAutoConfiguration
注释你的 class 并确保 lambda 方法与你在 application.yml
文件中为 function.definition
.
@Configuration
@EnableAutoConfiguration
public class KafkaListenerFunctionalService {
@Bean
public Function<KStream<String, String>, KStream<String, String>> transformToUpperCase() {
return input -> input
.peek((k, v) -> log.info("Functional received Input: {}", v))
.mapValues(i -> i.toUpperCase());
}
}
这是一个错误;它固定在 3.1.3-SNAPSHOT
https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1030#issuecomment-804039087
我不确定那里的评论;将千分尺添加到 class 路径应该可以解决它。
我有同样的问题,所以首先我添加 io.micrometer 依赖项(从 maven 安装最新版本)
第二次从 SimpleMeterRegistry 创建 Bean 并解决了问题
@Bean
SimpleMeterRegistry simpleMeterRegistry() {
return new SimpleMeterRegistry();
}
我是 Kafka Streams 和 Spring Cloud Stream 的新手,但在将集成相关代码移动到属性文件方面读到了一些好东西,这样开发人员就可以主要关注事物的业务逻辑方面。
这里是我的简单应用程序class。
package com.some.events.consumer
import com.some.events.SomeEvent
import org.apache.kafka.streams.kstream.KStream
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication
import org.springframework.context.annotation.Bean
import java.util.function.Consumer
@SpringBootApplication
class ConsumerApplication {
@Bean
fun consume(): Consumer<KStream<String, SomeEvent>> {
return Consumer { input -> input.foreach { key, value -> println("Key: $key, value: $value") } }
}
}
fun main(args: Array<String>) {
runApplication<ConsumerApplication>(*args)
}
我的application.yml
文件如下
spring:
cloud:
function:
definition: consume
stream:
bindings:
consume-in-0:
destination: "some-event"
group: "some-event"
我在build.gradle.kts
中的依赖定义如下(这里只包含了相关的)
extra["springCloudVersion"] = "2020.0.2"
dependencies {
implementation("org.jetbrains.kotlin:kotlin-reflect")
implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
implementation("org.springframework.cloud:spring-cloud-stream")
implementation("org.springframework.cloud:spring-cloud-stream-binder-kafka-streams")
testImplementation("org.springframework.boot:spring-boot-starter-test")
}
dependencyManagement {
imports {
mavenBom("org.springframework.cloud:spring-cloud-dependencies:${property("springCloudVersion")}")
}
}
当我 运行 应用程序时,出现以下异常。
org.springframework.context.ApplicationContextException: Failed to start bean 'streamsBuilderFactoryManager'; nested exception is org.springframework.kafka.KafkaException: Could not start stream: ; nested exception is java.lang.IllegalArgumentException: 'listener' cannot be null
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:181) ~[spring-context-5.3.5.jar:5.3.5]
at org.springframework.context.support.DefaultLifecycleProcessor.access0(DefaultLifecycleProcessor.java:54) ~[spring-context-5.3.5.jar:5.3.5]
at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:356) ~[spring-context-5.3.5.jar:5.3.5]
at java.base/java.lang.Iterable.forEach(Iterable.java:75) ~[na:na]
at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:155) ~[spring-context-5.3.5.jar:5.3.5]
at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:123) ~[spring-context-5.3.5.jar:5.3.5]
at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:935) ~[spring-context-5.3.5.jar:5.3.5]
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:586) ~[spring-context-5.3.5.jar:5.3.5]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:769) ~[spring-boot-2.4.4.jar:2.4.4]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:761) ~[spring-boot-2.4.4.jar:2.4.4]
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:426) ~[spring-boot-2.4.4.jar:2.4.4]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:326) ~[spring-boot-2.4.4.jar:2.4.4]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1313) ~[spring-boot-2.4.4.jar:2.4.4]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1302) ~[spring-boot-2.4.4.jar:2.4.4]
at com.some.events.consumer.ConsumerApplicationKt.main(ConsumerApplication.kt:22) ~[main/:na]
Caused by: org.springframework.kafka.KafkaException: Could not start stream: ; nested exception is java.lang.IllegalArgumentException: 'listener' cannot be null
at org.springframework.cloud.stream.binder.kafka.streams.StreamsBuilderFactoryManager.start(StreamsBuilderFactoryManager.java:94) ~[spring-cloud-stream-binder-kafka-streams-3.1.2.jar:3.1.2]
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:178) ~[spring-context-5.3.5.jar:5.3.5]
... 14 common frames omitted
Caused by: java.lang.IllegalArgumentException: 'listener' cannot be null
at org.springframework.util.Assert.notNull(Assert.java:201) ~[spring-core-5.3.5.jar:5.3.5]
at org.springframework.kafka.config.StreamsBuilderFactoryBean.addListener(StreamsBuilderFactoryBean.java:268) ~[spring-kafka-2.6.7.jar:2.6.7]
at org.springframework.cloud.stream.binder.kafka.streams.StreamsBuilderFactoryManager.start(StreamsBuilderFactoryManager.java:84) ~[spring-cloud-stream-binder-kafka-streams-3.1.2.jar:3.1.2]
... 15 common frames omitted
Process finished with exit code 1
请注意,我知道我需要配置 Serde 和 Avro 相关的东西(我将 Avro 用于事件模式),但问题是,流甚至不会 运行。
有人能指出我正确的方向吗?我尝试用谷歌搜索这个,但没有人发布过由 'listener' cannot be null 引起的问题。谢谢!
destination: "some-event"
应该指向一个 kafka 主题。喜欢 destination: "some-event-topic"
.
然后你必须为监听器创建一个接口consume-in-0
。使用 spring 注释将使项目加载此侦听器,并且它将不再为空。
import org.apache.kafka.streams.kstream.KStream;
import org.springframework.cloud.stream.annotation.Input;
public interface KafkaListenerBinding {
@Input("consume-in-0")
KStream<String, String> inputStream();
}
然后创建一个@Service 来处理来自侦听器的消息@StreamListener("consume-in-0")
。
import lombok.extern.log4j.Log4j2;
import org.apache.kafka.streams.kstream.KStream;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.stereotype.Service;
@Log4j2
@Service
@EnableBinding(KafkaListenerBinding.class)
public class KafkaListenerService {
@StreamListener("consume-in-0")
public void process(KStream<String, String> input) {
input.foreach((k,v) -> log.info(String.format("Key: %s, Value: %s",k,v)));
}
}
注意: 尽管@Gary Russel 说了这个错误,但我将以实现 Spring 服务的功能方式来完成我的回答。
函数式风格可以通过在application.yml
文件中定义函数来实现。有一个内部约定,使用函数名称和后缀 in-0
和 out-0
进行绑定。定义绑定时必须使用它。更多详情 here.
spring:
cloud:
stream:
function:
definition: transformToUpperCase
bindings:
transformToUpperCase-in-0:
destination: input-func-topic
transformToUpperCase-out-0:
destination: output-func-topic
然后你用 @Configuration
和 @EnableAutoConfiguration
注释你的 class 并确保 lambda 方法与你在 application.yml
文件中为 function.definition
.
@Configuration
@EnableAutoConfiguration
public class KafkaListenerFunctionalService {
@Bean
public Function<KStream<String, String>, KStream<String, String>> transformToUpperCase() {
return input -> input
.peek((k, v) -> log.info("Functional received Input: {}", v))
.mapValues(i -> i.toUpperCase());
}
}
这是一个错误;它固定在 3.1.3-SNAPSHOT
https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1030#issuecomment-804039087
我不确定那里的评论;将千分尺添加到 class 路径应该可以解决它。
我有同样的问题,所以首先我添加 io.micrometer 依赖项(从 maven 安装最新版本)
第二次从 SimpleMeterRegistry 创建 Bean 并解决了问题
@Bean
SimpleMeterRegistry simpleMeterRegistry() {
return new SimpleMeterRegistry();
}