如何一起对 Kafka Streams 和 Producer API 进行单元测试
How to unit test the Kafka Streams and Producer APIs together
目前,我有一个基本的 Kafka 流应用程序,它涉及一个只有源和处理器但没有接收器的拓扑。本质上,拓扑只处理消息的消费。至于生成消息,我们在传递给 Topology 的 ProcessorSupplier 实例中调用 Producer API,特别是在覆盖的 process
方法中。虽然我知道 Producer API 在这里是多余的,因为我可以简单地向拓扑添加一个接收器,但我处于必须以这种方式设置我的流应用程序的位置。至于测试,我尝试了 kafka-streams-test-utils 包中提供的 TopologyTestDriver
class。但是,我不仅要测试拓扑,还要测试对 Producer API 的调用。使用 TopologyTestDriver
需要我模拟我的 Producer
实例,因为它与 Streams API 是分开的。结果,由于消息不是 "fowarded",我无法从 TopologyTestDriver
读取消息以进行单元测试。
这是我的 process
方法的简化版本:
@Override
public void process(String key, String value) {
// some data processing stuff that I leave out for simplicity sake
String topic = "...";
Properties props = ...;
//Producer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record = new ProducerRecord(topic, key, value);
producer.send(record);
}
这里是我的示例单元测试的简化:
@Test
public void process() {
Topology topology = new Topology();
topology.addSource("source", "input-topic");
topology.addProcessor("processor", ..., "source");
Properties props = ...;
TopologyTestDriver testDriver = new TopologyTestDriver(topology, props);
ConsumerRecordFactory<String, String> factory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
// the following line will work fine as long as the producer is mocked
testDriver.pipeInput(factory.create("input-topic", "key", "value"));
// since the producer is mocked, no message can be read from the output topic
ProducerRecord<String, String> outputRecord = testDriver.readOutput("output-topic", new StringDeserializer(), new StringDeserializer());
assertNull(outputRecord); // returns true
}
总结一下我的问题,有没有一种方法可以编写单元测试来测试拓扑中消息的消费和生成,该拓扑使用生产者 API 将消息写入传出主题?
您不应使用自定义 Producer
,而应向您的 Topology
添加一个接收器。对 Producer.send()
的调用是异步的,因此您可能会丢失数据。为避免数据丢失,您需要使调用同步,即获取 send()
返回的 Future
并在 process()
returns 之前等待其完成。但是,这对你的吞吐量有很大的影响,不推荐。
如果您添加一个接收器,您就可以避免这些问题,因为 Kafka Streams 现在将了解发送到输出主题的数据,因此不会发生数据丢失,而 Kafka Streams 可以使用性能更高的异步调用。
除了正确性问题,您似乎为您在当前代码中处理的每条消息都创建了一个新的 KafkaProducer
,这是非常低效的。此外,使用接收器将简化您的代码。当然,您可以使用 TopologyTestDriver
.
获得适当的测试功能
目前,我有一个基本的 Kafka 流应用程序,它涉及一个只有源和处理器但没有接收器的拓扑。本质上,拓扑只处理消息的消费。至于生成消息,我们在传递给 Topology 的 ProcessorSupplier 实例中调用 Producer API,特别是在覆盖的 process
方法中。虽然我知道 Producer API 在这里是多余的,因为我可以简单地向拓扑添加一个接收器,但我处于必须以这种方式设置我的流应用程序的位置。至于测试,我尝试了 kafka-streams-test-utils 包中提供的 TopologyTestDriver
class。但是,我不仅要测试拓扑,还要测试对 Producer API 的调用。使用 TopologyTestDriver
需要我模拟我的 Producer
实例,因为它与 Streams API 是分开的。结果,由于消息不是 "fowarded",我无法从 TopologyTestDriver
读取消息以进行单元测试。
这是我的 process
方法的简化版本:
@Override
public void process(String key, String value) {
// some data processing stuff that I leave out for simplicity sake
String topic = "...";
Properties props = ...;
//Producer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record = new ProducerRecord(topic, key, value);
producer.send(record);
}
这里是我的示例单元测试的简化:
@Test
public void process() {
Topology topology = new Topology();
topology.addSource("source", "input-topic");
topology.addProcessor("processor", ..., "source");
Properties props = ...;
TopologyTestDriver testDriver = new TopologyTestDriver(topology, props);
ConsumerRecordFactory<String, String> factory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
// the following line will work fine as long as the producer is mocked
testDriver.pipeInput(factory.create("input-topic", "key", "value"));
// since the producer is mocked, no message can be read from the output topic
ProducerRecord<String, String> outputRecord = testDriver.readOutput("output-topic", new StringDeserializer(), new StringDeserializer());
assertNull(outputRecord); // returns true
}
总结一下我的问题,有没有一种方法可以编写单元测试来测试拓扑中消息的消费和生成,该拓扑使用生产者 API 将消息写入传出主题?
您不应使用自定义 Producer
,而应向您的 Topology
添加一个接收器。对 Producer.send()
的调用是异步的,因此您可能会丢失数据。为避免数据丢失,您需要使调用同步,即获取 send()
返回的 Future
并在 process()
returns 之前等待其完成。但是,这对你的吞吐量有很大的影响,不推荐。
如果您添加一个接收器,您就可以避免这些问题,因为 Kafka Streams 现在将了解发送到输出主题的数据,因此不会发生数据丢失,而 Kafka Streams 可以使用性能更高的异步调用。
除了正确性问题,您似乎为您在当前代码中处理的每条消息都创建了一个新的 KafkaProducer
,这是非常低效的。此外,使用接收器将简化您的代码。当然,您可以使用 TopologyTestDriver
.