Kafka - spring 云流
Kafka - spring cloud stream
我正在尝试将 spring-cloud-stream 与 kafka 一起使用。下面是示例代码。但它似乎没有做任何事情。它总是创建一个名为 'output' 的主题。但是这些值没有公布。
application.yaml
spring.cloud.stream:
function:
definition: streamSupplier
bindings:
streamSupplier-out-0:
destination: numbers
我的目标就是产生价值。
@SpringBootApplication
@EnableBinding(Source.class)
public class CloudStreamDemoApplication {
private AtomicInteger atomicInteger = new AtomicInteger();
public static void main(String[] args) {
SpringApplication.run(CloudStreamDemoApplication.class, args);
}
@Bean
public Supplier<Integer> streamSupplier(){
return () -> {
System.out.println("Publishing : " + atomicInteger.incrementAndGet());
return atomicInteger.get();
};
}
}
依赖性 - 2.2.6.RELEASE
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
您需要从 class 中删除 @EnableBinding(Source.class)
。如果存在,则不会发生功能绑定。
@EnableBinding 注释导致了上述问题。
阅读以下摘自 spring 文档:
Unlike previous versions of spring-cloud-stream which relied on @EnableBinding and @StreamListener annotations, the above example looks no different then any vanilla spring-boot application. It defines a single bean of type Function and that it is. So, how does it became spring-cloud-stream application? It becomes spring-cloud-stream application simply based on the presence of spring-cloud-stream and binder dependencies and auto-configuration classes on the classpath effectively setting the context for your boot application as spring-cloud-stream application. And in this context beans of type Supplier, Function or Consumer are treated as defacto message handlers triggering binding of to destinations exposed by the provided binder following certain naming conventions and rules to avoid extra configuration.
我正在尝试将 spring-cloud-stream 与 kafka 一起使用。下面是示例代码。但它似乎没有做任何事情。它总是创建一个名为 'output' 的主题。但是这些值没有公布。
application.yaml
spring.cloud.stream:
function:
definition: streamSupplier
bindings:
streamSupplier-out-0:
destination: numbers
我的目标就是产生价值。
@SpringBootApplication
@EnableBinding(Source.class)
public class CloudStreamDemoApplication {
private AtomicInteger atomicInteger = new AtomicInteger();
public static void main(String[] args) {
SpringApplication.run(CloudStreamDemoApplication.class, args);
}
@Bean
public Supplier<Integer> streamSupplier(){
return () -> {
System.out.println("Publishing : " + atomicInteger.incrementAndGet());
return atomicInteger.get();
};
}
}
依赖性 - 2.2.6.RELEASE
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
您需要从 class 中删除 @EnableBinding(Source.class)
。如果存在,则不会发生功能绑定。
@EnableBinding 注释导致了上述问题。
阅读以下摘自 spring 文档:
Unlike previous versions of spring-cloud-stream which relied on @EnableBinding and @StreamListener annotations, the above example looks no different then any vanilla spring-boot application. It defines a single bean of type Function and that it is. So, how does it became spring-cloud-stream application? It becomes spring-cloud-stream application simply based on the presence of spring-cloud-stream and binder dependencies and auto-configuration classes on the classpath effectively setting the context for your boot application as spring-cloud-stream application. And in this context beans of type Supplier, Function or Consumer are treated as defacto message handlers triggering binding of to destinations exposed by the provided binder following certain naming conventions and rules to avoid extra configuration.