如何从主题末尾读取,而不考虑组的提交偏移量
How to read from the end of a topic, regardless of the group's committed offset
我正在使用以下包来使用 kafka 消息
compile 'org.springframework.boot:spring-boot-starter-webflux'
compile("org.springframework.boot:spring-boot-starter-web")
// tag::actuator[]
compile("org.springframework.boot:spring-boot-starter-actuator")
compile('org.springframework.kafka:spring-kafka:2.1.7.RELEASE')
compile 'io.projectreactor.kafka:reactor-kafka:1.0.0.RELEASE'
我想从主题末尾开始消费消息,而不考虑组的提交偏移量
当我搜索时,我发现我们可以使用以下代码
consumer = new KafkaConsumer<>(properties);
consumer.seekToEnd(Collections.emptySet());
但是我找不到如何在 spring boot webflux
中使用上面的代码
@Component
public class EventConsumer
{
private final EmitterProcessor<ServerSentEvent<String>> emitter = EmitterProcessor.create();
public Flux<ServerSentEvent<String>> get()
{
return emitter;
}
@KafkaListener(topics = "${kafka.zone.status.topic.name}")
public void receive(String data)
{
//System.out.println(data);
emitter.onNext(ServerSentEvent.builder(data).id(UUID.randomUUID().toString()).build());
}
}
实施 ConsumerSeekAware
并在 onPartitionsAssigned
方法中执行查找。
@Component
public class EventConsumer implements ConsumerSeekAware {
private final EmitterProcessor<ServerSentEvent<String>> emitter = EmitterProcessor.create();
public Flux<ServerSentEvent<String>> get() {
return emitter;
}
@KafkaListener(topics = "${kafka.zone.status.topic.name}")
public void receive(String data) {
// System.out.println(data);
emitter.onNext(ServerSentEvent.builder(data).id(UUID.randomUUID().toString()).build());
}
@Override
public void registerSeekCallback(ConsumerSeekCallback callback) {
}
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
assignments.keySet().forEach(tp -> callback.seekToEnd(tp.topic(), tp.partition()));
}
@Override
public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
}
}
我正在使用以下包来使用 kafka 消息
compile 'org.springframework.boot:spring-boot-starter-webflux'
compile("org.springframework.boot:spring-boot-starter-web")
// tag::actuator[]
compile("org.springframework.boot:spring-boot-starter-actuator")
compile('org.springframework.kafka:spring-kafka:2.1.7.RELEASE')
compile 'io.projectreactor.kafka:reactor-kafka:1.0.0.RELEASE'
我想从主题末尾开始消费消息,而不考虑组的提交偏移量
当我搜索时,我发现我们可以使用以下代码
consumer = new KafkaConsumer<>(properties);
consumer.seekToEnd(Collections.emptySet());
但是我找不到如何在 spring boot webflux
中使用上面的代码@Component
public class EventConsumer
{
private final EmitterProcessor<ServerSentEvent<String>> emitter = EmitterProcessor.create();
public Flux<ServerSentEvent<String>> get()
{
return emitter;
}
@KafkaListener(topics = "${kafka.zone.status.topic.name}")
public void receive(String data)
{
//System.out.println(data);
emitter.onNext(ServerSentEvent.builder(data).id(UUID.randomUUID().toString()).build());
}
}
实施 ConsumerSeekAware
并在 onPartitionsAssigned
方法中执行查找。
@Component
public class EventConsumer implements ConsumerSeekAware {
private final EmitterProcessor<ServerSentEvent<String>> emitter = EmitterProcessor.create();
public Flux<ServerSentEvent<String>> get() {
return emitter;
}
@KafkaListener(topics = "${kafka.zone.status.topic.name}")
public void receive(String data) {
// System.out.println(data);
emitter.onNext(ServerSentEvent.builder(data).id(UUID.randomUUID().toString()).build());
}
@Override
public void registerSeekCallback(ConsumerSeekCallback callback) {
}
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
assignments.keySet().forEach(tp -> callback.seekToEnd(tp.topic(), tp.partition()));
}
@Override
public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
}
}