如何在kafka中初始化kafka ConsumerRecords<String,String>进行测试
How can I initialize kafka ConsumerRecords<String,String> in kafka for testing
我正在为 kafka 消费者组件编写测试用例并模拟 kafkaConsumer.poll()
returns ConsumerRecords<String,String>
的实例。我想初始化 ConsumerRecords
并在模拟中使用它,但是 ConsumerRecords
的构造函数期望我在测试中没有的实际 kafka 主题。
我认为的一种方法是保留对象的序列化副本并反序列化以初始化 ConsumerRecords
。
有没有其他方法可以达到同样的效果。
这是一些示例代码(Kafka 客户端库版本 0.10.1.1):
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.TopicPartition;
...
String topic = "MyTopic";
Collection<TopicPartition> partitions = new ArrayList<TopicPartition>();
Collection<String> topicsCollection = new ArrayList<String>();
partitions.add(new TopicPartition(topic, 1));
Map<TopicPartition, Long> partitionsBeginningMap = new HashMap<TopicPartition, Long>();
Map<TopicPartition, Long> partitionsEndMap = new HashMap<TopicPartition, Long>();
long records = 10;
for (TopicPartition partition : partitions) {
partitionsBeginningMap.put(partition, 0l);
partitionsEndMap.put(partition, records);
topicsCollection.add(partition.topic());
}
MockConsumer<String, MyObject> second = new MockConsumer<String, MyObject>(
OffsetResetStrategy.EARLIEST);
second.subscribe(topicsCollection);
second.rebalance(partitions);
second.updateBeginningOffsets(partitionsBeginningMap);
second.updateEndOffsets(partitionsEndMap);
for (long i = 0; i < 10; i++) {
MyObject value = Generator.generate();
ConsumerRecord<String, MyObject> record = new ConsumerRecord<String, MyObject>(
topic, 1, i, null,value);
second.addRecord(record);
}
...
我正在为 kafka 消费者组件编写测试用例并模拟 kafkaConsumer.poll()
returns ConsumerRecords<String,String>
的实例。我想初始化 ConsumerRecords
并在模拟中使用它,但是 ConsumerRecords
的构造函数期望我在测试中没有的实际 kafka 主题。
我认为的一种方法是保留对象的序列化副本并反序列化以初始化 ConsumerRecords
。
有没有其他方法可以达到同样的效果。
这是一些示例代码(Kafka 客户端库版本 0.10.1.1):
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.TopicPartition;
...
String topic = "MyTopic";
Collection<TopicPartition> partitions = new ArrayList<TopicPartition>();
Collection<String> topicsCollection = new ArrayList<String>();
partitions.add(new TopicPartition(topic, 1));
Map<TopicPartition, Long> partitionsBeginningMap = new HashMap<TopicPartition, Long>();
Map<TopicPartition, Long> partitionsEndMap = new HashMap<TopicPartition, Long>();
long records = 10;
for (TopicPartition partition : partitions) {
partitionsBeginningMap.put(partition, 0l);
partitionsEndMap.put(partition, records);
topicsCollection.add(partition.topic());
}
MockConsumer<String, MyObject> second = new MockConsumer<String, MyObject>(
OffsetResetStrategy.EARLIEST);
second.subscribe(topicsCollection);
second.rebalance(partitions);
second.updateBeginningOffsets(partitionsBeginningMap);
second.updateEndOffsets(partitionsEndMap);
for (long i = 0; i < 10; i++) {
MyObject value = Generator.generate();
ConsumerRecord<String, MyObject> record = new ConsumerRecord<String, MyObject>(
topic, 1, i, null,value);
second.addRecord(record);
}
...