Kafka:消费者api:如果在一个组中运行(顺序),回归测试失败
Kafka: Consumer api: Regression test fails if runs in a group (sequentially)
我已经使用消费者 api 实现了一个 kafka 应用程序 。我有 2 回归测试与流 api:
- 测试快乐路径: 通过从测试中生成数据(进入应用程序正在侦听的输入主题),应用程序将使用这些数据,应用程序将生成数据(进入输出主题)测试将根据预期输出数据使用和验证。
- 测试错误路径: 行为同上。尽管这一次应用程序会将数据生成到输出主题中,而测试将从应用程序的错误主题中获取数据并根据预期的错误输出进行验证。
我的代码和回归测试代码位于预期目录结构下的同一项目下。两次(对于两次测试)数据都应该由应用程序端的同一个侦听器获取。
问题是:
当我单独(手动)执行测试时,每个测试都通过了。但是,如果我按顺序一起执行它们(例如:gradle clean build),则只有第一个测试通过。第二次测试在测试端消费者轮询数据后失败,一段时间后它放弃找不到任何数据。
观察:
从调试来看,第一次一切正常(测试端和应用程序端生产者和消费者)。然而,在第二次测试中,application-side-consumer似乎没有收到任何数据(似乎test-side-producer正在生产数据,但不能肯定地说)因此没有数据被生成到错误主题中。
目前我尝试过的:
经过调查,我的理解是我们正在进入竞争条件,为了避免这种情况,我们发现了如下建议:
- 使用@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
- 每次测试后撕掉broker(请看broker上的".destry()")
- 为每个测试使用不同的主题名称
我应用了所有这些,但仍然无法从我的问题中恢复过来。
我在这里提供代码以供阅读。任何见解表示赞赏。
第一次测试代码(测试错误路径):
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
@EmbeddedKafka(
partitions = 1,
controlledShutdown = false,
topics = {
AdapterStreamProperties.Constants.INPUT_TOPIC,
AdapterStreamProperties.Constants.ERROR_TOPIC
},
brokerProperties = {
"listeners=PLAINTEXT://localhost:9092",
"port=9092",
"log.dir=/tmp/data/logs",
"auto.create.topics.enable=true",
"delete.topic.enable=true"
}
)
public class AbstractIntegrationFailurePathTest {
private final int retryLimit = 0;
@Autowired
protected EmbeddedKafkaBroker embeddedFailurePathKafkaBroker;
//To produce data
@Autowired
protected KafkaTemplate<PreferredMediaMsgKey, SendEmailCmd> inputProducerTemplate;
//To read from output error
@Autowired
protected Consumer<PreferredMediaMsgKey, ErrorCmd> outputErrorConsumer;
//Service to execute notification-preference
@Autowired
protected AdapterStreamProperties projectProerties;
protected void subscribe(Consumer consumer, String topic, int attempt) {
try {
embeddedFailurePathKafkaBroker.consumeFromAnEmbeddedTopic(consumer, topic);
} catch (ComparisonFailure ex) {
if (attempt < retryLimit) {
subscribe(consumer, topic, attempt + 1);
}
}
}
}
.
@TestConfiguration
public class AdapterStreamFailurePathTestConfig {
@Autowired
private EmbeddedKafkaBroker embeddedKafkaBroker;
@Value("${spring.kafka.adapter.application-id}")
private String applicationId;
@Value("${spring.kafka.adapter.group-id}")
private String groupId;
//Producer of records that the program consumes
@Bean
public Map<String, Object> sendEmailCmdProducerConfigs() {
Map<String, Object> results = KafkaTestUtils.producerProps(embeddedKafkaBroker);
results.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
AdapterStreamProperties.Constants.KEY_SERDE.serializer().getClass());
results.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
AdapterStreamProperties.Constants.INPUT_VALUE_SERDE.serializer().getClass());
return results;
}
@Bean
public ProducerFactory<PreferredMediaMsgKey, SendEmailCmd> inputProducerFactory() {
return new DefaultKafkaProducerFactory<>(sendEmailCmdProducerConfigs());
}
@Bean
public KafkaTemplate<PreferredMediaMsgKey, SendEmailCmd> inputProducerTemplate() {
return new KafkaTemplate<>(inputProducerFactory());
}
//Consumer of the error output, generated by the program
@Bean
public Map<String, Object> outputErrorConsumerConfig() {
Map<String, Object> props = KafkaTestUtils.consumerProps(
applicationId, Boolean.TRUE.toString(), embeddedKafkaBroker);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
AdapterStreamProperties.Constants.KEY_SERDE.deserializer().getClass()
.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
AdapterStreamProperties.Constants.ERROR_VALUE_SERDE.deserializer().getClass()
.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return props;
}
@Bean
public Consumer<PreferredMediaMsgKey, ErrorCmd> outputErrorConsumer() {
DefaultKafkaConsumerFactory<PreferredMediaMsgKey, ErrorCmd> rpf =
new DefaultKafkaConsumerFactory<>(outputErrorConsumerConfig());
return rpf.createConsumer(groupId, "notification-failure");
}
}
.
@RunWith(SpringRunner.class)
@SpringBootTest(classes = AdapterStreamFailurePathTestConfig.class)
@ActiveProfiles(profiles = "errtest")
public class ErrorPath400Test extends AbstractIntegrationFailurePathTest {
@Autowired
private DataGenaratorForErrorPath400Test datagen;
@Mock
private AdapterHttpClient httpClient;
@Autowired
private ErroredEmailCmdDeserializer erroredEmailCmdDeserializer;
@Before
public void setup() throws InterruptedException {
Mockito.when(httpClient.callApi(Mockito.any()))
.thenReturn(
new GenericResponse(
400,
TestConstants.ERROR_MSG_TO_CHK));
Mockito.when(httpClient.createURI(Mockito.any(),Mockito.any(),Mockito.any())).thenCallRealMethod();
inputProducerTemplate.send(
projectProerties.getInputTopic(),
datagen.getKey(),
datagen.getEmailCmdToProduce());
System.out.println("producer: "+ projectProerties.getInputTopic());
subscribe(outputErrorConsumer , projectProerties.getErrorTopic(), 0);
}
@Test
public void testWithError() throws InterruptedException, InvalidProtocolBufferException, TextFormat.ParseException {
ConsumerRecords<PreferredMediaMsgKeyBuf.PreferredMediaMsgKey, ErrorCommandBuf.ErrorCmd> records;
List<ConsumerRecord<PreferredMediaMsgKeyBuf.PreferredMediaMsgKey, ErrorCommandBuf.ErrorCmd>> outputListOfErrors = new ArrayList<>();
int attempt = 0;
int expectedRecords = 1;
do {
records = KafkaTestUtils.getRecords(outputErrorConsumer);
records.forEach(outputListOfErrors::add);
attempt++;
} while (attempt < expectedRecords && outputListOfErrors.size() < expectedRecords);
//Verify the recipient event stream size
Assert.assertEquals(expectedRecords, outputListOfErrors.size());
//Validate output
}
@After
public void tearDown() {
outputErrorConsumer.close();
embeddedFailurePathKafkaBroker.destroy();
}
}
第二个测试在结构上几乎相同。虽然这次测试端消费者是从应用程序端输出主题(而不是错误主题)消费的。我以不同的方式命名了消费者、经纪人、生产者、主题。喜欢:
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
@EmbeddedKafka(
partitions = 1,
controlledShutdown = false,
topics = {
AdapterStreamProperties.Constants.INPUT_TOPIC,
AdapterStreamProperties.Constants.OUTPUT_TOPIC
},
brokerProperties = {
"listeners=PLAINTEXT://localhost:9092",
"port=9092",
"log.dir=/tmp/data/logs",
"auto.create.topics.enable=true",
"delete.topic.enable=true"
}
)
public class AbstractIntegrationSuccessPathTest {
private final int retryLimit = 0;
@Autowired
protected EmbeddedKafkaBroker embeddedKafkaBroker;
//To produce data
@Autowired
protected KafkaTemplate<PreferredMediaMsgKey,SendEmailCmd> sendEmailCmdProducerTemplate;
//To read from output regular topic
@Autowired
protected Consumer<PreferredMediaMsgKey, NotifiedEmailCmd> ouputConsumer;
//Service to execute notification-preference
@Autowired
protected AdapterStreamProperties projectProerties;
protected void subscribe(Consumer consumer, String topic, int attempt) {
try {
embeddedKafkaBroker.consumeFromAnEmbeddedTopic(consumer, topic);
} catch (ComparisonFailure ex) {
if (attempt < retryLimit) {
subscribe(consumer, topic, attempt + 1);
}
}
}
}
如果我需要提供更多信息,请告诉我。,
"port=9092"
不要使用固定端口;忽略它,嵌入式代理将使用随机端口;消费者配置在 KafkaTestUtils
中设置为指向随机端口。
您不需要在每个测试方法之后弄脏上下文 - 对每个测试使用不同的 group.id
和不同的 topic
.
就我而言,消费者未正确关闭。我必须做:
@After
public void tearDown() {
// shutdown hook to correctly close the streams application
Runtime.getRuntime().addShutdownHook(new Thread(ouputConsumer::close));
}
解决。
我已经使用消费者 api 实现了一个 kafka 应用程序 。我有 2 回归测试与流 api:
- 测试快乐路径: 通过从测试中生成数据(进入应用程序正在侦听的输入主题),应用程序将使用这些数据,应用程序将生成数据(进入输出主题)测试将根据预期输出数据使用和验证。
- 测试错误路径: 行为同上。尽管这一次应用程序会将数据生成到输出主题中,而测试将从应用程序的错误主题中获取数据并根据预期的错误输出进行验证。
我的代码和回归测试代码位于预期目录结构下的同一项目下。两次(对于两次测试)数据都应该由应用程序端的同一个侦听器获取。
问题是:
当我单独(手动)执行测试时,每个测试都通过了。但是,如果我按顺序一起执行它们(例如:gradle clean build),则只有第一个测试通过。第二次测试在测试端消费者轮询数据后失败,一段时间后它放弃找不到任何数据。
观察:
从调试来看,第一次一切正常(测试端和应用程序端生产者和消费者)。然而,在第二次测试中,application-side-consumer似乎没有收到任何数据(似乎test-side-producer正在生产数据,但不能肯定地说)因此没有数据被生成到错误主题中。
目前我尝试过的:
经过调查,我的理解是我们正在进入竞争条件,为了避免这种情况,我们发现了如下建议:
- 使用@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
- 每次测试后撕掉broker(请看broker上的".destry()")
- 为每个测试使用不同的主题名称
我应用了所有这些,但仍然无法从我的问题中恢复过来。
我在这里提供代码以供阅读。任何见解表示赞赏。
第一次测试代码(测试错误路径):
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
@EmbeddedKafka(
partitions = 1,
controlledShutdown = false,
topics = {
AdapterStreamProperties.Constants.INPUT_TOPIC,
AdapterStreamProperties.Constants.ERROR_TOPIC
},
brokerProperties = {
"listeners=PLAINTEXT://localhost:9092",
"port=9092",
"log.dir=/tmp/data/logs",
"auto.create.topics.enable=true",
"delete.topic.enable=true"
}
)
public class AbstractIntegrationFailurePathTest {
private final int retryLimit = 0;
@Autowired
protected EmbeddedKafkaBroker embeddedFailurePathKafkaBroker;
//To produce data
@Autowired
protected KafkaTemplate<PreferredMediaMsgKey, SendEmailCmd> inputProducerTemplate;
//To read from output error
@Autowired
protected Consumer<PreferredMediaMsgKey, ErrorCmd> outputErrorConsumer;
//Service to execute notification-preference
@Autowired
protected AdapterStreamProperties projectProerties;
protected void subscribe(Consumer consumer, String topic, int attempt) {
try {
embeddedFailurePathKafkaBroker.consumeFromAnEmbeddedTopic(consumer, topic);
} catch (ComparisonFailure ex) {
if (attempt < retryLimit) {
subscribe(consumer, topic, attempt + 1);
}
}
}
}
.
@TestConfiguration
public class AdapterStreamFailurePathTestConfig {
@Autowired
private EmbeddedKafkaBroker embeddedKafkaBroker;
@Value("${spring.kafka.adapter.application-id}")
private String applicationId;
@Value("${spring.kafka.adapter.group-id}")
private String groupId;
//Producer of records that the program consumes
@Bean
public Map<String, Object> sendEmailCmdProducerConfigs() {
Map<String, Object> results = KafkaTestUtils.producerProps(embeddedKafkaBroker);
results.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
AdapterStreamProperties.Constants.KEY_SERDE.serializer().getClass());
results.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
AdapterStreamProperties.Constants.INPUT_VALUE_SERDE.serializer().getClass());
return results;
}
@Bean
public ProducerFactory<PreferredMediaMsgKey, SendEmailCmd> inputProducerFactory() {
return new DefaultKafkaProducerFactory<>(sendEmailCmdProducerConfigs());
}
@Bean
public KafkaTemplate<PreferredMediaMsgKey, SendEmailCmd> inputProducerTemplate() {
return new KafkaTemplate<>(inputProducerFactory());
}
//Consumer of the error output, generated by the program
@Bean
public Map<String, Object> outputErrorConsumerConfig() {
Map<String, Object> props = KafkaTestUtils.consumerProps(
applicationId, Boolean.TRUE.toString(), embeddedKafkaBroker);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
AdapterStreamProperties.Constants.KEY_SERDE.deserializer().getClass()
.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
AdapterStreamProperties.Constants.ERROR_VALUE_SERDE.deserializer().getClass()
.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return props;
}
@Bean
public Consumer<PreferredMediaMsgKey, ErrorCmd> outputErrorConsumer() {
DefaultKafkaConsumerFactory<PreferredMediaMsgKey, ErrorCmd> rpf =
new DefaultKafkaConsumerFactory<>(outputErrorConsumerConfig());
return rpf.createConsumer(groupId, "notification-failure");
}
}
.
@RunWith(SpringRunner.class)
@SpringBootTest(classes = AdapterStreamFailurePathTestConfig.class)
@ActiveProfiles(profiles = "errtest")
public class ErrorPath400Test extends AbstractIntegrationFailurePathTest {
@Autowired
private DataGenaratorForErrorPath400Test datagen;
@Mock
private AdapterHttpClient httpClient;
@Autowired
private ErroredEmailCmdDeserializer erroredEmailCmdDeserializer;
@Before
public void setup() throws InterruptedException {
Mockito.when(httpClient.callApi(Mockito.any()))
.thenReturn(
new GenericResponse(
400,
TestConstants.ERROR_MSG_TO_CHK));
Mockito.when(httpClient.createURI(Mockito.any(),Mockito.any(),Mockito.any())).thenCallRealMethod();
inputProducerTemplate.send(
projectProerties.getInputTopic(),
datagen.getKey(),
datagen.getEmailCmdToProduce());
System.out.println("producer: "+ projectProerties.getInputTopic());
subscribe(outputErrorConsumer , projectProerties.getErrorTopic(), 0);
}
@Test
public void testWithError() throws InterruptedException, InvalidProtocolBufferException, TextFormat.ParseException {
ConsumerRecords<PreferredMediaMsgKeyBuf.PreferredMediaMsgKey, ErrorCommandBuf.ErrorCmd> records;
List<ConsumerRecord<PreferredMediaMsgKeyBuf.PreferredMediaMsgKey, ErrorCommandBuf.ErrorCmd>> outputListOfErrors = new ArrayList<>();
int attempt = 0;
int expectedRecords = 1;
do {
records = KafkaTestUtils.getRecords(outputErrorConsumer);
records.forEach(outputListOfErrors::add);
attempt++;
} while (attempt < expectedRecords && outputListOfErrors.size() < expectedRecords);
//Verify the recipient event stream size
Assert.assertEquals(expectedRecords, outputListOfErrors.size());
//Validate output
}
@After
public void tearDown() {
outputErrorConsumer.close();
embeddedFailurePathKafkaBroker.destroy();
}
}
第二个测试在结构上几乎相同。虽然这次测试端消费者是从应用程序端输出主题(而不是错误主题)消费的。我以不同的方式命名了消费者、经纪人、生产者、主题。喜欢:
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
@EmbeddedKafka(
partitions = 1,
controlledShutdown = false,
topics = {
AdapterStreamProperties.Constants.INPUT_TOPIC,
AdapterStreamProperties.Constants.OUTPUT_TOPIC
},
brokerProperties = {
"listeners=PLAINTEXT://localhost:9092",
"port=9092",
"log.dir=/tmp/data/logs",
"auto.create.topics.enable=true",
"delete.topic.enable=true"
}
)
public class AbstractIntegrationSuccessPathTest {
private final int retryLimit = 0;
@Autowired
protected EmbeddedKafkaBroker embeddedKafkaBroker;
//To produce data
@Autowired
protected KafkaTemplate<PreferredMediaMsgKey,SendEmailCmd> sendEmailCmdProducerTemplate;
//To read from output regular topic
@Autowired
protected Consumer<PreferredMediaMsgKey, NotifiedEmailCmd> ouputConsumer;
//Service to execute notification-preference
@Autowired
protected AdapterStreamProperties projectProerties;
protected void subscribe(Consumer consumer, String topic, int attempt) {
try {
embeddedKafkaBroker.consumeFromAnEmbeddedTopic(consumer, topic);
} catch (ComparisonFailure ex) {
if (attempt < retryLimit) {
subscribe(consumer, topic, attempt + 1);
}
}
}
}
如果我需要提供更多信息,请告诉我。,
"port=9092"
不要使用固定端口;忽略它,嵌入式代理将使用随机端口;消费者配置在 KafkaTestUtils
中设置为指向随机端口。
您不需要在每个测试方法之后弄脏上下文 - 对每个测试使用不同的 group.id
和不同的 topic
.
就我而言,消费者未正确关闭。我必须做:
@After
public void tearDown() {
// shutdown hook to correctly close the streams application
Runtime.getRuntime().addShutdownHook(new Thread(ouputConsumer::close));
}
解决。