多个 @EnableBinding 与 Kafka Spring Cloud Stream
Multiple @EnableBinding with Kafka Spring Cloud Stream
我正在尝试设置一个 Spring 侦听 Kafka 的引导应用程序。
我正在使用 Kafka Streams Binder。
用一个简单的@EnableBinding
@EnableBinding(StreamExample.StreamProcessor.class)
public class StreamExample {
@StreamListener(StreamProcessor.INPUT)
@SendTo(StreamProcessor.OUTPUT)
public KStream<String, String> process(KStream<String, String> input) {
logger.info("Stream listening");
return input
.peek(((key, value) -> logger.info("key = {} value = {}", key, value)));
}
interface StreamProcessor {
String INPUT = "input_1";
String OUTPUT = "output_1";
@Input(INPUT)
KStream<String, String> input();
@Output(OUTPUT)
KStream<String, String> outputProcessed();
}
}
并在 application.yml
spring:
cloud:
stream:
kafka:
streams:
binder:
brokers: localhost:29092
bindings:
input_1:
destination: mytopic1
group: readgroup
output_1:
destination: mytopic2
input_2:
destination: mytopic3
group: readgroup
output_2:
destination: mytopic4
application:
name: stream_s1000_app
一切正常。
但是如果我尝试添加第二个 class 与其他绑定,则会出现以下错误:
以下订阅主题未分配给任何成员:[mytopic1]
第二个绑定的例子:
@EnableBinding(StreamExampleBindingTwo.StreamProcessor.class)
public class StreamExampleBindingTwo {
@StreamListener(StreamProcessor.INPUT)
@SendTo(StreamProcessor.OUTPUT)
public KStream<String, String> process(KStream<String, String> input) {
logger.info("Stream listening binding two");
return input
.peek(((key, value) -> logger.info("key = {} value = {}", key, value)));
}
interface StreamProcessor {
String INPUT = "input_2";
String OUTPUT = "output_2";
@Input(INPUT)
KStream<String, String> input();
@Output(OUTPUT)
KStream<String, String> outputProcessed();
}
}
我错过了什么?我不能在同一应用程序中使用多个输入主题和多个输出吗?有一些与 application.name?
有关的内容
尝试
@EnableBinding( { StreamExample.StreamProcessor.class, StreamExampleBindingTwo.StreamProcessor.class })
我刚刚试用了一款应用,效果不错。当您在同一个应用程序中有多个处理器时,您需要确保每个处理器都有自己的应用程序 ID。
请参阅下文,我如何为 application.yml
.
中的两个输入设置 2 个不同的应用程序 ID
我看到两个处理器都在控制台上登录。此外,还看到了关于输出主题的消息。
@SpringBootApplication
@EnableBinding({So54522918Application.StreamProcessor1.class, So54522918Application.StreamProcessor2.class})
public class So54522918Application {
public static void main(String[] args) {
SpringApplication.run(So54522918Application.class, args);
}
@StreamListener(StreamProcessor1.INPUT)
@SendTo(StreamProcessor1.OUTPUT)
public KStream<String, String> process1(KStream<String, String> input) {
System.out.println("Stream listening");
return input
.peek(((key, value) -> System.out.println("key = " + key +", value = " + value)));
}
@StreamListener(StreamProcessor2.INPUT)
@SendTo(StreamProcessor2.OUTPUT)
public KStream<String, String> process2(KStream<String, String> input) {
System.out.println("Stream listening binding two");
return input
.peek(((key, value) -> System.out.println("key = " + key +", value = " + value)));
}
interface StreamProcessor1 {
String INPUT = "input_1";
String OUTPUT = "output_1";
@Input(INPUT)
KStream<String, String> input();
@Output(OUTPUT)
KStream<String, String> outputProcessed();
}
interface StreamProcessor2 {
String INPUT = "input_2";
String OUTPUT = "output_2";
@Input(INPUT)
KStream<String, String> input();
@Output(OUTPUT)
KStream<String, String> outputProcessed();
}
}
application.yml
的相关部分
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms: 1000
spring.cloud.stream.kafka.streams:
binder.configuration:
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
bindings.input_1.consumer.application-id: process-1
bindings.input_2.consumer.application-id: process-2
spring.cloud.stream.bindings.input_1:
destination: mytopic1
spring.cloud.stream.bindings.output_1:
destination: mytopic2
spring.cloud.stream.bindings.input_2:
destination: mytopic3
spring.cloud.stream.bindings.output_2:
destination: mytopic4
我正在尝试设置一个 Spring 侦听 Kafka 的引导应用程序。
我正在使用 Kafka Streams Binder。
用一个简单的@EnableBinding
@EnableBinding(StreamExample.StreamProcessor.class)
public class StreamExample {
@StreamListener(StreamProcessor.INPUT)
@SendTo(StreamProcessor.OUTPUT)
public KStream<String, String> process(KStream<String, String> input) {
logger.info("Stream listening");
return input
.peek(((key, value) -> logger.info("key = {} value = {}", key, value)));
}
interface StreamProcessor {
String INPUT = "input_1";
String OUTPUT = "output_1";
@Input(INPUT)
KStream<String, String> input();
@Output(OUTPUT)
KStream<String, String> outputProcessed();
}
}
并在 application.yml
spring:
cloud:
stream:
kafka:
streams:
binder:
brokers: localhost:29092
bindings:
input_1:
destination: mytopic1
group: readgroup
output_1:
destination: mytopic2
input_2:
destination: mytopic3
group: readgroup
output_2:
destination: mytopic4
application:
name: stream_s1000_app
一切正常。
但是如果我尝试添加第二个 class 与其他绑定,则会出现以下错误:
以下订阅主题未分配给任何成员:[mytopic1]
第二个绑定的例子:
@EnableBinding(StreamExampleBindingTwo.StreamProcessor.class)
public class StreamExampleBindingTwo {
@StreamListener(StreamProcessor.INPUT)
@SendTo(StreamProcessor.OUTPUT)
public KStream<String, String> process(KStream<String, String> input) {
logger.info("Stream listening binding two");
return input
.peek(((key, value) -> logger.info("key = {} value = {}", key, value)));
}
interface StreamProcessor {
String INPUT = "input_2";
String OUTPUT = "output_2";
@Input(INPUT)
KStream<String, String> input();
@Output(OUTPUT)
KStream<String, String> outputProcessed();
}
}
我错过了什么?我不能在同一应用程序中使用多个输入主题和多个输出吗?有一些与 application.name?
有关的内容尝试
@EnableBinding( { StreamExample.StreamProcessor.class, StreamExampleBindingTwo.StreamProcessor.class })
我刚刚试用了一款应用,效果不错。当您在同一个应用程序中有多个处理器时,您需要确保每个处理器都有自己的应用程序 ID。
请参阅下文,我如何为 application.yml
.
我看到两个处理器都在控制台上登录。此外,还看到了关于输出主题的消息。
@SpringBootApplication
@EnableBinding({So54522918Application.StreamProcessor1.class, So54522918Application.StreamProcessor2.class})
public class So54522918Application {
public static void main(String[] args) {
SpringApplication.run(So54522918Application.class, args);
}
@StreamListener(StreamProcessor1.INPUT)
@SendTo(StreamProcessor1.OUTPUT)
public KStream<String, String> process1(KStream<String, String> input) {
System.out.println("Stream listening");
return input
.peek(((key, value) -> System.out.println("key = " + key +", value = " + value)));
}
@StreamListener(StreamProcessor2.INPUT)
@SendTo(StreamProcessor2.OUTPUT)
public KStream<String, String> process2(KStream<String, String> input) {
System.out.println("Stream listening binding two");
return input
.peek(((key, value) -> System.out.println("key = " + key +", value = " + value)));
}
interface StreamProcessor1 {
String INPUT = "input_1";
String OUTPUT = "output_1";
@Input(INPUT)
KStream<String, String> input();
@Output(OUTPUT)
KStream<String, String> outputProcessed();
}
interface StreamProcessor2 {
String INPUT = "input_2";
String OUTPUT = "output_2";
@Input(INPUT)
KStream<String, String> input();
@Output(OUTPUT)
KStream<String, String> outputProcessed();
}
}
application.yml
的相关部分spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms: 1000
spring.cloud.stream.kafka.streams:
binder.configuration:
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
bindings.input_1.consumer.application-id: process-1
bindings.input_2.consumer.application-id: process-2
spring.cloud.stream.bindings.input_1:
destination: mytopic1
spring.cloud.stream.bindings.output_1:
destination: mytopic2
spring.cloud.stream.bindings.input_2:
destination: mytopic3
spring.cloud.stream.bindings.output_2:
destination: mytopic4