Kafka & Spring Batch - 如何只读取来自同一主题的未提交消息?
Kafka & Spring Batch - How to read ONLY uncommitted messages from the same topic?
我正在使用 Spring 批处理和 Kafka 进行小批量处理,从 Kafka 主题读取 json 数据,将其转换为 Student 对象,更改值并将其发送回卡夫卡主题。
一切正常,但我唯一的问题是我的消费者总是从主题的开头开始阅读。
我需要它来读取最后一条未使用的消息。
我已经添加了这些属性:
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to earliest
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to false
ConsumerConfig.GROUP_ID_CONFIG to a random value
但这似乎不起作用,在消费者启动时,它会处理所有消息。有人知道如何使用 Spring Batch 和 Kafka 吗?这是我的代码:
BatchStudent.java :
@SpringBootApplication
@EnableBatchProcessing
@RequiredArgsConstructor
public class BatchStudent {
public static void main(String[] args) {
SpringApplication.run(BatchStudent.class, args);
}
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final KafkaTemplate<Integer, Student> template;
private final KafkaProperties properties;
@Value("${kafka.topic.consumer}")
private String topic;
@Bean
public ItemProcessor<Student, Student> customItemProcessor() {
return new CustomProcessor();
}
@Bean
Job job() {
return this.jobBuilderFactory.get("job")
.start(start())
.incrementer(new RunIdIncrementer())
.build();
}
@Bean
KafkaItemWriter<Integer, Student> writer() {
return new KafkaItemWriterBuilder<Integer, Student>()
.kafkaTemplate(template)
.itemKeyMapper(Student::getId)
.build();
}
@Bean
public KafkaItemReader<Integer, Student> reader() {
Properties props = new Properties();
props.putAll(this.properties.buildConsumerProperties());
return new KafkaItemReaderBuilder<Integer, Student>()
.partitions(0)
.consumerProperties(props)
.name("students-consumer-reader")
.saveState(true)
.topic(topic)
.build();
}
@Bean
Step start() {
return this.stepBuilderFactory
.get("step")
.<Student, Student>chunk(10)
.writer(writer())
.processor(customItemProcessor())
.reader(reader())
.build();
}
}
app.yml
spring.batch.initialize-schema: always
#Conf Kafka Consumer
spring.kafka.consumer.key-deserializer: org.apache.kafka.common.serialization.IntegerDeserializer
spring.kafka.consumer.value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
#spring.kafka.consumer.group-id: student-group
spring.kafka.consumer.properties.spring.json.trusted.packages: '*'
spring.kafka.consumer.properties.spring.json.value.default.type: com.org.model.Student
#Conf Kafka Producer
spring.kafka.producer.key-serializer: org.apache.kafka.common.serialization.IntegerSerializer
spring.kafka.producer.value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.bootstrap-servers: localhost:9092
#Conf topics
spring.kafka.template.default-topic: producer.student
kafka.topic.consumer: consumer.student
Student.java
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Student {
Integer id;
Integer count;
}
CustomProcessor.java
@NoArgsConstructor
public class CustomProcessor implements ItemProcessor<Student, Student> {
@Override
public Student process(Student studentRecieved) {
final Student studentSent = new Student();
studentSent.setId(studentRecieved.getId());
studentSent.setCount(200);
return studentSent;
}
}
感谢您的帮助!
Everything is working fine, but my only problem is that my consumer is ALWAYS reading from the begging of the topic. I need it to read from the last non consumed message.
Spring Batch 4.3 引入了一种从 Kafka 中存储的偏移量消费记录的方法。我在去年 Spring One 的演讲中谈到了这个功能:What's new in Spring Batch 4.3?. You can configure the kafka reader with a custom starting offset in each partition by using setPartitionOffsets:
Setter for partition offsets. This mapping tells the reader the offset to start reading
from in each partition. This is optional, defaults to starting from offset 0 in each
partition. Passing an empty map makes the reader start from the offset stored in Kafka
for the consumer group ID.
您可以在 this test case 中找到完整的示例。
我正在使用 Spring 批处理和 Kafka 进行小批量处理,从 Kafka 主题读取 json 数据,将其转换为 Student 对象,更改值并将其发送回卡夫卡主题。 一切正常,但我唯一的问题是我的消费者总是从主题的开头开始阅读。 我需要它来读取最后一条未使用的消息。 我已经添加了这些属性:
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to earliest
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to false
ConsumerConfig.GROUP_ID_CONFIG to a random value
但这似乎不起作用,在消费者启动时,它会处理所有消息。有人知道如何使用 Spring Batch 和 Kafka 吗?这是我的代码:
BatchStudent.java :
@SpringBootApplication
@EnableBatchProcessing
@RequiredArgsConstructor
public class BatchStudent {
public static void main(String[] args) {
SpringApplication.run(BatchStudent.class, args);
}
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final KafkaTemplate<Integer, Student> template;
private final KafkaProperties properties;
@Value("${kafka.topic.consumer}")
private String topic;
@Bean
public ItemProcessor<Student, Student> customItemProcessor() {
return new CustomProcessor();
}
@Bean
Job job() {
return this.jobBuilderFactory.get("job")
.start(start())
.incrementer(new RunIdIncrementer())
.build();
}
@Bean
KafkaItemWriter<Integer, Student> writer() {
return new KafkaItemWriterBuilder<Integer, Student>()
.kafkaTemplate(template)
.itemKeyMapper(Student::getId)
.build();
}
@Bean
public KafkaItemReader<Integer, Student> reader() {
Properties props = new Properties();
props.putAll(this.properties.buildConsumerProperties());
return new KafkaItemReaderBuilder<Integer, Student>()
.partitions(0)
.consumerProperties(props)
.name("students-consumer-reader")
.saveState(true)
.topic(topic)
.build();
}
@Bean
Step start() {
return this.stepBuilderFactory
.get("step")
.<Student, Student>chunk(10)
.writer(writer())
.processor(customItemProcessor())
.reader(reader())
.build();
}
}
app.yml
spring.batch.initialize-schema: always
#Conf Kafka Consumer
spring.kafka.consumer.key-deserializer: org.apache.kafka.common.serialization.IntegerDeserializer
spring.kafka.consumer.value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
#spring.kafka.consumer.group-id: student-group
spring.kafka.consumer.properties.spring.json.trusted.packages: '*'
spring.kafka.consumer.properties.spring.json.value.default.type: com.org.model.Student
#Conf Kafka Producer
spring.kafka.producer.key-serializer: org.apache.kafka.common.serialization.IntegerSerializer
spring.kafka.producer.value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.bootstrap-servers: localhost:9092
#Conf topics
spring.kafka.template.default-topic: producer.student
kafka.topic.consumer: consumer.student
Student.java
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Student {
Integer id;
Integer count;
}
CustomProcessor.java
@NoArgsConstructor
public class CustomProcessor implements ItemProcessor<Student, Student> {
@Override
public Student process(Student studentRecieved) {
final Student studentSent = new Student();
studentSent.setId(studentRecieved.getId());
studentSent.setCount(200);
return studentSent;
}
}
感谢您的帮助!
Everything is working fine, but my only problem is that my consumer is ALWAYS reading from the begging of the topic. I need it to read from the last non consumed message.
Spring Batch 4.3 引入了一种从 Kafka 中存储的偏移量消费记录的方法。我在去年 Spring One 的演讲中谈到了这个功能:What's new in Spring Batch 4.3?. You can configure the kafka reader with a custom starting offset in each partition by using setPartitionOffsets:
Setter for partition offsets. This mapping tells the reader the offset to start reading
from in each partition. This is optional, defaults to starting from offset 0 in each
partition. Passing an empty map makes the reader start from the offset stored in Kafka
for the consumer group ID.
您可以在 this test case 中找到完整的示例。