如何在骆驼路线上模拟 Kafka 消费者端点?
How to Mock a Kafka Consumer endpoint on a Camel Route?
我有一个 Camel 端点,它基本上是一个 Kafka 消费者从主题中读取信息并将信息发送到数据库。它工作正常,但是,由于无法模拟 Kafka 端点,我正在努力对其进行单元测试。谁能帮我在骆驼路线上模拟 Kafka 消费者?
@Override
public void configure() {
from(kafka:eph?brokers=localhost:9092...).routeId("KafkaConsumer")
.to(direct:updateDatabase)
}
要对您的路线进行单元测试,您可以使用标准的 camel spring 引导测试来完成。
在测试期间,Kafka 生产者(在 Camel 看来)可以与直接组件交换,模拟消息可以在那里传递。要查看您的路由是否正确处理了这些消息,可以使用 Mock endpoints。
//Route definition
@Component
public class KafkaRoute extends RouteBuilder {
public static final String KAFKA_ROUTE_NAME = "kafka-route";
@Override
public void configure() throws Exception {
from("kafka:eph?brokers=localhost:9092").routeId(KAFKA_ROUTE_NAME)
.log(LoggingLevel.INFO, "Message: ${body} received on the topic: ${headers[kafka.TOPIC]} ")
.to("direct:updateDatabase");
from("direct:updateDatabase").log(LoggingLevel.INFO, "DB Updated.");
}
}
import java.util.HashMap;
import java.util.Map;
import org.apache.camel.CamelContext;
import org.apache.camel.EndpointInject;
import org.apache.camel.Produce;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.AdviceWithRouteBuilder;
import org.apache.camel.component.kafka.KafkaConstants;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.test.spring.CamelSpringBootRunner;
import org.apache.camel.test.spring.MockEndpoints;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.annotation.DirtiesContext;
@RunWith(CamelSpringBootRunner.class)
@SpringBootTest
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
@MockEndpoints("direct:*")
public class KafkaRouteTest {
@Autowired
CamelContext camelContext;
@Produce
ProducerTemplate mockKafkaProducer;
@EndpointInject("mock:direct:updateDatabase")
MockEndpoint finalSink;
@Test
public void testKafkaRoute() throws Exception {
//Here we swap the FROM component in the KafkaRoute.KAFKA_ROUTE_NAME with a direct component, direct:kafka-from
AdviceWithRouteBuilder.adviceWith(camelContext, KafkaRoute.KAFKA_ROUTE_NAME, routeBuilder -> {
routeBuilder.replaceFromWith("direct:kafka-from");
});
Map<String, Object> headers = new HashMap<>();
headers.put(KafkaConstants.TOPIC, "testTopic");
//Send mock message to the route
mockKafkaProducer.sendBodyAndHeaders("direct:kafka-from", "test-body", headers);
//Assertions. You may do additional assertions with the likes of Mockito
finalSink.expectedBodiesReceived("test-body");
finalSink.expectedHeaderReceived(KafkaConstants.TOPIC, "testTopic");
finalSink.assertIsSatisfied();
}
}
Camel Kafka 组件已经 unit tested, there is no point in replicating all those tests in your code base. However, if you really want to do testing against a real Kafka instance, you may use test containers. Here 是一个完整的示例,来自 Camel 存储库本身,使用测试容器。
在 属性 中简单地外部化端点 URI(例如使用 Spring 属性 设施)
from(consumerEndpoint).routeId("KafkaConsumer")
然后在您的生产配置中,使用真实端点
consumerEndpoint=kafka:eph?brokers=localhost:9092...
而在您的测试配置中,您使用的是直接端点
consumerEndpoint=direct:consumer
这个很容易从 Camel 路由测试中触发
producer.sendBody("direct:consumer", myMessageBody);
我有一个 Camel 端点,它基本上是一个 Kafka 消费者从主题中读取信息并将信息发送到数据库。它工作正常,但是,由于无法模拟 Kafka 端点,我正在努力对其进行单元测试。谁能帮我在骆驼路线上模拟 Kafka 消费者?
@Override
public void configure() {
from(kafka:eph?brokers=localhost:9092...).routeId("KafkaConsumer")
.to(direct:updateDatabase)
}
要对您的路线进行单元测试,您可以使用标准的 camel spring 引导测试来完成。 在测试期间,Kafka 生产者(在 Camel 看来)可以与直接组件交换,模拟消息可以在那里传递。要查看您的路由是否正确处理了这些消息,可以使用 Mock endpoints。
//Route definition
@Component
public class KafkaRoute extends RouteBuilder {
public static final String KAFKA_ROUTE_NAME = "kafka-route";
@Override
public void configure() throws Exception {
from("kafka:eph?brokers=localhost:9092").routeId(KAFKA_ROUTE_NAME)
.log(LoggingLevel.INFO, "Message: ${body} received on the topic: ${headers[kafka.TOPIC]} ")
.to("direct:updateDatabase");
from("direct:updateDatabase").log(LoggingLevel.INFO, "DB Updated.");
}
}
import java.util.HashMap;
import java.util.Map;
import org.apache.camel.CamelContext;
import org.apache.camel.EndpointInject;
import org.apache.camel.Produce;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.AdviceWithRouteBuilder;
import org.apache.camel.component.kafka.KafkaConstants;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.test.spring.CamelSpringBootRunner;
import org.apache.camel.test.spring.MockEndpoints;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.annotation.DirtiesContext;
@RunWith(CamelSpringBootRunner.class)
@SpringBootTest
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
@MockEndpoints("direct:*")
public class KafkaRouteTest {
@Autowired
CamelContext camelContext;
@Produce
ProducerTemplate mockKafkaProducer;
@EndpointInject("mock:direct:updateDatabase")
MockEndpoint finalSink;
@Test
public void testKafkaRoute() throws Exception {
//Here we swap the FROM component in the KafkaRoute.KAFKA_ROUTE_NAME with a direct component, direct:kafka-from
AdviceWithRouteBuilder.adviceWith(camelContext, KafkaRoute.KAFKA_ROUTE_NAME, routeBuilder -> {
routeBuilder.replaceFromWith("direct:kafka-from");
});
Map<String, Object> headers = new HashMap<>();
headers.put(KafkaConstants.TOPIC, "testTopic");
//Send mock message to the route
mockKafkaProducer.sendBodyAndHeaders("direct:kafka-from", "test-body", headers);
//Assertions. You may do additional assertions with the likes of Mockito
finalSink.expectedBodiesReceived("test-body");
finalSink.expectedHeaderReceived(KafkaConstants.TOPIC, "testTopic");
finalSink.assertIsSatisfied();
}
}
Camel Kafka 组件已经 unit tested, there is no point in replicating all those tests in your code base. However, if you really want to do testing against a real Kafka instance, you may use test containers. Here 是一个完整的示例,来自 Camel 存储库本身,使用测试容器。
在 属性 中简单地外部化端点 URI(例如使用 Spring 属性 设施)
from(consumerEndpoint).routeId("KafkaConsumer")
然后在您的生产配置中,使用真实端点
consumerEndpoint=kafka:eph?brokers=localhost:9092...
而在您的测试配置中,您使用的是直接端点
consumerEndpoint=direct:consumer
这个很容易从 Camel 路由测试中触发
producer.sendBody("direct:consumer", myMessageBody);