Kafka Spring:如何为 ConcurrentKafkaListenerContainerFactory 和 ConcurrentMessageListenerContainer 编写单元测试?
Kafka Spring: How to write unit tests for ConcurrentKafkaListenerContainerFactory and ConcurrentMessageListenerContainer?
我有 2 个 class; 1 个用于工厂,另一个用于侦听器容器:
public class ConsumerFactories() {
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Byte[]> adeKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Byte[]> factory = null;
factory = new ConcurrentKafkaListenerContainerFactory<String, Byte[]>();
factory.setConsumerFactory(consumerFactory1());
factory.setConsumerFactory(consumerFactory2());
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
}
我的监听器 class 有多个容器:
@Bean
public ConcurrentMessageListenerContainer<String, byte[]> adeListenerContainer() throws BeansException, ClassNotFoundException {
final ContainerProperties containerProperties =
new ContainerProperties("topic1");
containerProperties.setMessageListener(new MessageListener<String, byte[]>() {
@Override
public void onMessage(ConsumerRecord<String, byte[]> record) {
System.out.println("Thread is: " + Thread.currentThread().getName());
}
});
ConcurrentMessageListenerContainer<String, byte[]> container =
new ConcurrentMessageListenerContainer<>(consumerFactory1, containerProperties);
container.setBeanName("bean1");
container.setConcurrency(60);
container.start();
return container;
}
@Bean
public ConcurrentMessageListenerContainer<String, byte[]> adeListenerContainer() throws BeansException, ClassNotFoundException {
final ContainerProperties containerProperties =
new ContainerProperties("topic1");
containerProperties.setMessageListener(new MessageListener<String, byte[]>() {
@Override
public void onMessage(ConsumerRecord<String, byte[]> record) {
System.out.println("Thread is: " + Thread.currentThread().getName());
}
});
ConcurrentMessageListenerContainer<String, byte[]> container =
new ConcurrentMessageListenerContainer<>(consumerFactory2, containerProperties);
container.setBeanName("bean2");
container.setConcurrency(60);
container.start();
return container;
}
1) 如何为这 2 classes 和方法编写单元测试?
2) 由于我所有的侦听器容器都在做相同的处理工作,但针对的是一组不同的主题,我可以在设置 consumerFactory 或任何其他方式时传递主题吗?
1.
container.start();
从不 start()
bean 定义中的组件 - 应用程序上下文尚未准备好;容器将在正确的时间自动启动容器(只要 autoStartup
为真(默认)。
- 如果您自己创建容器,为什么还需要容器工厂?
不清楚你要测试什么。
编辑
这是一个以编程方式注册容器的示例,使用 Spring Boot 的自动配置容器工厂(2.2 及更高版本)...
@SpringBootApplication
public class So53752783Application {
public static void main(String[] args) {
SpringApplication.run(So53752783Application.class, args);
}
@SuppressWarnings("unchecked")
@Bean
public SmartInitializingSingleton creator(ConfigurableListableBeanFactory beanFactory,
ConcurrentKafkaListenerContainerFactory<String, String> factory) {
return () -> Stream.of("foo", "bar", "baz").forEach(topic -> {
ConcurrentMessageListenerContainer<String, String> container = factory.createContainer(topic);
container.getContainerProperties().setMessageListener((MessageListener<String, String>) record -> {
System.out.println("Received " + record);
});
container.getContainerProperties().setGroupId(topic + ".group");
container = (ConcurrentMessageListenerContainer<String, String>)
beanFactory.initializeBean(container, topic + ".container");
beanFactory.registerSingleton(topic + ".container", container);
container.start();
});
}
}
要对您的侦听器进行单元测试,
container.getContainerProperties().getMessagelistener()
转换并调用 onMessage()
并验证它是否如您所愿。
EDIT2 单元测试监听器
@SpringBootApplication
public class So53752783Application {
public static void main(String[] args) {
SpringApplication.run(So53752783Application.class, args);
}
@SuppressWarnings("unchecked")
@Bean
public SmartInitializingSingleton creator(ConfigurableListableBeanFactory beanFactory,
ConcurrentKafkaListenerContainerFactory<String, String> factory,
MyListener listener) {
return () -> Stream.of("foo", "bar", "baz").forEach(topic -> {
ConcurrentMessageListenerContainer<String, String> container = factory.createContainer(topic);
container.getContainerProperties().setMessageListener(listener);
container.getContainerProperties().setGroupId(topic + ".group");
container = (ConcurrentMessageListenerContainer<String, String>)
beanFactory.initializeBean(container, topic + ".container");
beanFactory.registerSingleton(topic + ".container", container);
container.start();
});
}
@Bean
public MyListener listener() {
return new MyListener();
}
public static class MyListener implements MessageListener<String, String> {
@Autowired
private Service service;
public void setService(Service service) {
this.service = service;
}
@Override
public void onMessage(ConsumerRecord<String, String> data) {
this.service.callSomeService(data.value().toUpperCase());
}
}
public interface Service {
void callSomeService(String in);
}
@Component
public static class DefaultService implements Service {
@Override
public void callSomeService(String in) {
// ...
}
}
}
和
@RunWith(SpringRunner.class)
@SpringBootTest
public class So53752783ApplicationTests {
@Autowired
private ApplicationContext context;
@Test
public void test() {
ConcurrentMessageListenerContainer<?, ?> container = context.getBean("foo.container",
ConcurrentMessageListenerContainer.class);
MyListener messageListener = (MyListener) container.getContainerProperties().getMessageListener();
Service service = mock(Service.class);
messageListener.setService(service);
messageListener.onMessage(new ConsumerRecord<>("foo", 0, 0L, "key", "foo"));
verify(service).callSomeService("FOO");
}
}
我有 2 个 class; 1 个用于工厂,另一个用于侦听器容器:
public class ConsumerFactories() {
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Byte[]> adeKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Byte[]> factory = null;
factory = new ConcurrentKafkaListenerContainerFactory<String, Byte[]>();
factory.setConsumerFactory(consumerFactory1());
factory.setConsumerFactory(consumerFactory2());
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
}
我的监听器 class 有多个容器:
@Bean
public ConcurrentMessageListenerContainer<String, byte[]> adeListenerContainer() throws BeansException, ClassNotFoundException {
final ContainerProperties containerProperties =
new ContainerProperties("topic1");
containerProperties.setMessageListener(new MessageListener<String, byte[]>() {
@Override
public void onMessage(ConsumerRecord<String, byte[]> record) {
System.out.println("Thread is: " + Thread.currentThread().getName());
}
});
ConcurrentMessageListenerContainer<String, byte[]> container =
new ConcurrentMessageListenerContainer<>(consumerFactory1, containerProperties);
container.setBeanName("bean1");
container.setConcurrency(60);
container.start();
return container;
}
@Bean
public ConcurrentMessageListenerContainer<String, byte[]> adeListenerContainer() throws BeansException, ClassNotFoundException {
final ContainerProperties containerProperties =
new ContainerProperties("topic1");
containerProperties.setMessageListener(new MessageListener<String, byte[]>() {
@Override
public void onMessage(ConsumerRecord<String, byte[]> record) {
System.out.println("Thread is: " + Thread.currentThread().getName());
}
});
ConcurrentMessageListenerContainer<String, byte[]> container =
new ConcurrentMessageListenerContainer<>(consumerFactory2, containerProperties);
container.setBeanName("bean2");
container.setConcurrency(60);
container.start();
return container;
}
1) 如何为这 2 classes 和方法编写单元测试?
2) 由于我所有的侦听器容器都在做相同的处理工作,但针对的是一组不同的主题,我可以在设置 consumerFactory 或任何其他方式时传递主题吗?
1.
container.start();
从不 start()
bean 定义中的组件 - 应用程序上下文尚未准备好;容器将在正确的时间自动启动容器(只要 autoStartup
为真(默认)。
- 如果您自己创建容器,为什么还需要容器工厂?
不清楚你要测试什么。
编辑
这是一个以编程方式注册容器的示例,使用 Spring Boot 的自动配置容器工厂(2.2 及更高版本)...
@SpringBootApplication
public class So53752783Application {
public static void main(String[] args) {
SpringApplication.run(So53752783Application.class, args);
}
@SuppressWarnings("unchecked")
@Bean
public SmartInitializingSingleton creator(ConfigurableListableBeanFactory beanFactory,
ConcurrentKafkaListenerContainerFactory<String, String> factory) {
return () -> Stream.of("foo", "bar", "baz").forEach(topic -> {
ConcurrentMessageListenerContainer<String, String> container = factory.createContainer(topic);
container.getContainerProperties().setMessageListener((MessageListener<String, String>) record -> {
System.out.println("Received " + record);
});
container.getContainerProperties().setGroupId(topic + ".group");
container = (ConcurrentMessageListenerContainer<String, String>)
beanFactory.initializeBean(container, topic + ".container");
beanFactory.registerSingleton(topic + ".container", container);
container.start();
});
}
}
要对您的侦听器进行单元测试,
container.getContainerProperties().getMessagelistener()
转换并调用 onMessage()
并验证它是否如您所愿。
EDIT2 单元测试监听器
@SpringBootApplication
public class So53752783Application {
public static void main(String[] args) {
SpringApplication.run(So53752783Application.class, args);
}
@SuppressWarnings("unchecked")
@Bean
public SmartInitializingSingleton creator(ConfigurableListableBeanFactory beanFactory,
ConcurrentKafkaListenerContainerFactory<String, String> factory,
MyListener listener) {
return () -> Stream.of("foo", "bar", "baz").forEach(topic -> {
ConcurrentMessageListenerContainer<String, String> container = factory.createContainer(topic);
container.getContainerProperties().setMessageListener(listener);
container.getContainerProperties().setGroupId(topic + ".group");
container = (ConcurrentMessageListenerContainer<String, String>)
beanFactory.initializeBean(container, topic + ".container");
beanFactory.registerSingleton(topic + ".container", container);
container.start();
});
}
@Bean
public MyListener listener() {
return new MyListener();
}
public static class MyListener implements MessageListener<String, String> {
@Autowired
private Service service;
public void setService(Service service) {
this.service = service;
}
@Override
public void onMessage(ConsumerRecord<String, String> data) {
this.service.callSomeService(data.value().toUpperCase());
}
}
public interface Service {
void callSomeService(String in);
}
@Component
public static class DefaultService implements Service {
@Override
public void callSomeService(String in) {
// ...
}
}
}
和
@RunWith(SpringRunner.class)
@SpringBootTest
public class So53752783ApplicationTests {
@Autowired
private ApplicationContext context;
@Test
public void test() {
ConcurrentMessageListenerContainer<?, ?> container = context.getBean("foo.container",
ConcurrentMessageListenerContainer.class);
MyListener messageListener = (MyListener) container.getContainerProperties().getMessageListener();
Service service = mock(Service.class);
messageListener.setService(service);
messageListener.onMessage(new ConsumerRecord<>("foo", 0, 0L, "key", "foo"));
verify(service).callSomeService("FOO");
}
}