是否可以分别使用来自 kafka 主题的每条消息?
Is it posible to consume each message from a kafka topic separately?
好的,目标如下:
我有一个应该发送邮件的服务,如果失败,我的 Kafka 生产者会将这封邮件发送到 Kafka 主题。第二个程序每两分钟查看一次主题,应该只使用一条消息(最旧的消息)并重试发送它,如果失败,程序应该将这条消息带回主题。
我已经有一个消费者了,但问题是,它消费了我到现在为止还没有消费过的所有消息。但是我想让他只消费最旧的那个,他之前没有消费过。
这是我的实际消费者:
"customMessage" 是我为测试而创建的 class,它只是一个具有不同属性(如日期、消息和其他内容)的对象。
为了测试代码稍微修改了一下,所以这个消费者是 运行 endles。它也只显示消息而不是处理它,所以只有“System.out.println(message.displayMessage());
”。
publicclass消费者一号{
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("group.id", "happyConsumer");
properties.put("auto.offset.reset", "earliest");
properties.put("enable.auto.commit", "false");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
consumer.subscribe(Arrays.asList("mymailtopic"));
while (true) {
ObjectMapper om = new ObjectMapper();
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
customMessage message=null;
try {
message = om.readValue(record.value(), customMessage.class);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
System.out.println(message.displayMessage());
}
}
}
}
使用实际代码,该服务也能正常工作,但我准备至少尝试一下它是否也能像我在第一段中提到的那样工作。所以,我的问题是:
- 甚至可以只使用主题中的一条消息吗?
- 如果是,我该怎么做?
我正在为整个服务使用 Java 和 quarkus(我认为是最新版本的 Kafka)。
您可以使用 max.poll.records 属性 逐条消费消息,在配置
中将此 属性 设置为 1
The maximum number of records returned in a single call to poll().
properties.put("max.poll.records", 1);
好的,目标如下: 我有一个应该发送邮件的服务,如果失败,我的 Kafka 生产者会将这封邮件发送到 Kafka 主题。第二个程序每两分钟查看一次主题,应该只使用一条消息(最旧的消息)并重试发送它,如果失败,程序应该将这条消息带回主题。
我已经有一个消费者了,但问题是,它消费了我到现在为止还没有消费过的所有消息。但是我想让他只消费最旧的那个,他之前没有消费过。
这是我的实际消费者:
"customMessage" 是我为测试而创建的 class,它只是一个具有不同属性(如日期、消息和其他内容)的对象。
为了测试代码稍微修改了一下,所以这个消费者是 运行 endles。它也只显示消息而不是处理它,所以只有“System.out.println(message.displayMessage());
”。
publicclass消费者一号{
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("group.id", "happyConsumer");
properties.put("auto.offset.reset", "earliest");
properties.put("enable.auto.commit", "false");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
consumer.subscribe(Arrays.asList("mymailtopic"));
while (true) {
ObjectMapper om = new ObjectMapper();
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
customMessage message=null;
try {
message = om.readValue(record.value(), customMessage.class);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
System.out.println(message.displayMessage());
}
}
}
}
使用实际代码,该服务也能正常工作,但我准备至少尝试一下它是否也能像我在第一段中提到的那样工作。所以,我的问题是:
- 甚至可以只使用主题中的一条消息吗?
- 如果是,我该怎么做?
我正在为整个服务使用 Java 和 quarkus(我认为是最新版本的 Kafka)。
您可以使用 max.poll.records 属性 逐条消费消息,在配置
中将此 属性 设置为1
The maximum number of records returned in a single call to poll().
properties.put("max.poll.records", 1);