spring 云流 - 消费者组绑定
spring cloud stream - consumer group bound
我的消费者绑定到匿名消费者组,而不是我指定的消费者组。
spring:
cloud:
stream:
kafka:
binder:
brokers: localhost
defaultBrokerPort: 9092
zkNodes: localhost
defaultZkPort: 2181
bindings:
inEvent:
group: eventin
destination: event
outEvent:
group: eventout
destination: processevent
我的 Spring 启动应用程序
@SpringBootApplication
@EnableBinding(EventStream.class)
public class ConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class, args);
}
@StreamListener(value = "inEvent")
public void getEvent(Event event){
System.out.println(event.name);
}
}
我的输入输出通道界面
public interface EventStream {
@Input("inEvent")
SubscribableChannel inEvent();
@Output("outEvent")
MessageChannel outEvent();
}
我的控制台日志--
: Started ConsumerApplication in 3.233 seconds (JVM running for 4.004)
: [Consumer clientId=consumer-3, groupId=anonymous.0d0c87d6-ef39-4bfe-b475-4491c40caf6d] Discovered
group coordinator singh:9092 (id: 2147483647 rack: null)
: [Consumer clientId=consumer-3, groupId=anonymous.0d0c87d6-ef39-4bfe-b475-4491c40caf6d] Revoking
previously assigned partitions []
: partitions revoked: []
: [Consumer clientId=consumer-3, groupId=anonymous.0d0c87d6-ef39-4bfe-b475-4491c40caf6d] (Re-)joining
group
: [Consumer clientId=consumer-3, groupId=anonymous.0d0c87d6-ef39-4bfe-b475-4491c40caf6d] Successfully
joined group with generation 1
: [Consumer clientId=consumer-3, groupId=anonymous.0d0c87d6-ef39-4bfe-b475-4491c40caf6d] Setting newly
assigned partitions [inEvent-0]
: [Consumer clientId=consumer-3, groupId=anonymous.0d0c87d6-ef39-4bfe-b475-4491c40caf6d] Resetting
offset for partition inEvent-0 to offset 2.
: partitions assigned: [inEvent-0]
group
属性 不能在 kafka
树中。
它必须是这样的:
我的消费者绑定到匿名消费者组,而不是我指定的消费者组。
spring:
cloud:
stream:
bindings:
inEvent:
group: eventin
destination: event
group
是普通的属性,所以不管binder的实现是一样的。 kafka
用于 Apache Kafka 特定属性,在其活页夹实现级别公开。
我的消费者绑定到匿名消费者组,而不是我指定的消费者组。
spring:
cloud:
stream:
kafka:
binder:
brokers: localhost
defaultBrokerPort: 9092
zkNodes: localhost
defaultZkPort: 2181
bindings:
inEvent:
group: eventin
destination: event
outEvent:
group: eventout
destination: processevent
我的 Spring 启动应用程序
@SpringBootApplication
@EnableBinding(EventStream.class)
public class ConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class, args);
}
@StreamListener(value = "inEvent")
public void getEvent(Event event){
System.out.println(event.name);
}
}
我的输入输出通道界面
public interface EventStream {
@Input("inEvent")
SubscribableChannel inEvent();
@Output("outEvent")
MessageChannel outEvent();
}
我的控制台日志--
: Started ConsumerApplication in 3.233 seconds (JVM running for 4.004) : [Consumer clientId=consumer-3, groupId=anonymous.0d0c87d6-ef39-4bfe-b475-4491c40caf6d] Discovered group coordinator singh:9092 (id: 2147483647 rack: null) : [Consumer clientId=consumer-3, groupId=anonymous.0d0c87d6-ef39-4bfe-b475-4491c40caf6d] Revoking previously assigned partitions [] : partitions revoked: [] : [Consumer clientId=consumer-3, groupId=anonymous.0d0c87d6-ef39-4bfe-b475-4491c40caf6d] (Re-)joining group : [Consumer clientId=consumer-3, groupId=anonymous.0d0c87d6-ef39-4bfe-b475-4491c40caf6d] Successfully joined group with generation 1 : [Consumer clientId=consumer-3, groupId=anonymous.0d0c87d6-ef39-4bfe-b475-4491c40caf6d] Setting newly assigned partitions [inEvent-0] : [Consumer clientId=consumer-3, groupId=anonymous.0d0c87d6-ef39-4bfe-b475-4491c40caf6d] Resetting offset for partition inEvent-0 to offset 2. : partitions assigned: [inEvent-0]
group
属性 不能在 kafka
树中。
它必须是这样的:
我的消费者绑定到匿名消费者组,而不是我指定的消费者组。
spring:
cloud:
stream:
bindings:
inEvent:
group: eventin
destination: event
group
是普通的属性,所以不管binder的实现是一样的。 kafka
用于 Apache Kafka 特定属性,在其活页夹实现级别公开。