Spring 云合同 - 没有为主题设置消费者 [...]
Spring Cloud Contracts - No consumer set up for topic [...]
我目前正在使用 Spring 云合同开发 API 兼容性检查。我设置了文档中所说的一切。但是我遇到了一个问题——java.lang.IllegalStateException: No consumer set up for topic [testSyncTopic]
。该异常在 KafkaStubMessages
class 中抛出。所以我认为这是图书馆相关的问题。在我的项目中,我有两个独立的 Maven 项目。他们每个人都是消费者和生产者(单独的主题)。我的合同放在其他存储库中。
所以...我目前正在处理以下场景:
我们有 2 个模块 - 模块 A 和 B。模块 A 向主题 T1 和 T2 的 Kafka 主题 t1 和 t2 发送消息,并从主题 T3 和 T4 接收消息 t3 和 t4。模块 B 从 T1 和 T2 接收并发送到 T3 和 T4。
所有消费者测试都通过了每个模块。但是生产者测试最终出现了主题中提到的错误。
我怀疑这是存根创建错误造成的。所以没有设置合适的监听器。
我尝试了不同的kafka配置,但我相信情况并非如此。我还检查了 spring 云合同配置,但似乎一切正常。生成带有存根的适当罐子。不幸的是,Google 在这件事上帮不上什么忙。
如果您需要任何信息来帮助我,请随时询问。
我现在正在处理它好几天了,所以我很绝望,真的需要你的帮助。
编辑:添加堆栈跟踪和相关代码片段
堆栈跟踪:
java.lang.IllegalStateException: No consumer set up for topic [testSyncTopic]
at org.springframework.cloud.contract.verifier.messaging.kafka.Receiver.receive(KafkaStubMessages.java:110)
at org.springframework.cloud.contract.verifier.messaging.kafka.KafkaStubMessages.receive(KafkaStubMessages.java:80)
at org.springframework.cloud.contract.verifier.messaging.kafka.KafkaStubMessages.receive(KafkaStubMessages.java:42)
at com.comarch.fsm.dispatcher.rest.ContractBaseTest.setup(ContractBaseTest.groovy:56)
基础测试class配置:
@SpringBootTest
@EmbeddedKafka(bootstrapServersProperty = "spring.kafka.bootstrap-servers", brokerProperties = ["log.dir=target/embedded-kafka"])
@AutoConfigureStubRunner
abstract class BaseTestConfig extends Specification {
}
我的合约定义:
Pattern customDateTime() {
Pattern.compile('([0-9]{4})-(1[0-2]|0[1-9])-(0[1-9]|[12][0-9])T(2[0-3]|[01][0-9]):([0-5][0-9]):([0-5][0-9])Z')
}
Contract.make {
label("sync")
input {
triggeredBy("sync()")
}
outputMessage {
sentTo("testSyncTopic")
body(
syncStart: $(customDateTime())
)
}
}
ContractBaseTest class:
abstract class ContractBaseTest extends BaseTestConfig {
@Autowired
private KafkaService kafkaService;
def synchronizeData() {
kafkaService.sendKafkaMessage("testSyncTopic", null, new SyncDto(new Date()));
}
}
为什么你的基础测试 class 有 @AutoConfigureStubRunner
而它应该有 @AutoConfigureMessageVerifier
?看来你把消费者和生产者混为一谈了。
请在此处查看使用 Kafka 的生产者示例:https://github.com/spring-cloud-samples/spring-cloud-contract-samples/blob/master/producer_kafka。出于可读性原因,我将在此处复制粘贴它。
制作人
package com.example;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
// remove::start[]
import org.springframework.cloud.contract.verifier.messaging.boot.AutoConfigureMessageVerifier;
import org.springframework.kafka.test.context.EmbeddedKafka;
// remove::end[]
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE)
// remove::start[]
@AutoConfigureMessageVerifier
@EmbeddedKafka(partitions = 1, topics = {"topic1"})
// remove::end[]
@ActiveProfiles("test")
public abstract class BaseClass {
@Autowired
Controller controller;
public void trigger() {
this.controller.sendFoo("example");
}
}
在这里你可以找到控制器
package com.example;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;
import com.common.Foo1;
/**
* @author Gary Russell
* @since 2.2.1
*/
@RestController
public class Controller {
@Autowired
private KafkaTemplate<Object, Object> template;
@PostMapping(path = "/send/foo/{what}")
public void sendFoo(@PathVariable String what) {
this.template.send("topic1", new Foo1(what));
}
}
这里可以看到生产配置(application.yml
)
spring:
kafka:
producer:
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
logging.level.org.springframework.cloud.contract: debug
在这里你可以看到测试配置(application-test.yml
)
spring:
kafka:
bootstrap-servers: ${spring.embedded.kafka.brokers}
consumer:
properties:
"key.serializer": "org.springframework.kafka.support.serializer.JsonSerializer"
"key.deserializer": "org.springframework.kafka.support.serializer.JsonDeserializer"
group-id: groupId
import org.springframework.cloud.contract.spec.Contract
Contract.make {
label("trigger")
input {
triggeredBy("trigger()")
}
outputMessage {
sentTo("topic1")
body([
foo: "example"
])
}
}
消费者
现在是消费者时间 (https://github.com/spring-cloud-samples/spring-cloud-contract-samples/tree/master/consumer_kafka)
package com.example;
import org.assertj.core.api.BDDAssertions;
import org.awaitility.Awaitility;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
// remove::start[]
import org.springframework.cloud.contract.stubrunner.StubTrigger;
import org.springframework.cloud.contract.stubrunner.spring.AutoConfigureStubRunner;
import org.springframework.cloud.contract.stubrunner.spring.StubRunnerProperties;
import org.springframework.kafka.test.context.EmbeddedKafka;
// remove::end[]
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE)
// remove::start[]
@AutoConfigureStubRunner(ids = "com.example:beer-api-producer-kafka", stubsMode = StubRunnerProperties.StubsMode.LOCAL)
@EmbeddedKafka(topics = "topic1")
// remove::end[]
@ActiveProfiles("test")
public class ApplicationTests {
// remove::start[]
@Autowired
StubTrigger trigger;
@Autowired
Application application;
@Test
public void contextLoads() {
this.trigger.trigger("trigger");
Awaitility.await().untilAsserted(() -> {
BDDAssertions.then(this.application.storedFoo).isNotNull();
BDDAssertions.then(this.application.storedFoo.getFoo()).contains("example");
});
}
// remove::end[]
}
我目前正在使用 Spring 云合同开发 API 兼容性检查。我设置了文档中所说的一切。但是我遇到了一个问题——java.lang.IllegalStateException: No consumer set up for topic [testSyncTopic]
。该异常在 KafkaStubMessages
class 中抛出。所以我认为这是图书馆相关的问题。在我的项目中,我有两个独立的 Maven 项目。他们每个人都是消费者和生产者(单独的主题)。我的合同放在其他存储库中。
所以...我目前正在处理以下场景: 我们有 2 个模块 - 模块 A 和 B。模块 A 向主题 T1 和 T2 的 Kafka 主题 t1 和 t2 发送消息,并从主题 T3 和 T4 接收消息 t3 和 t4。模块 B 从 T1 和 T2 接收并发送到 T3 和 T4。
所有消费者测试都通过了每个模块。但是生产者测试最终出现了主题中提到的错误。
我怀疑这是存根创建错误造成的。所以没有设置合适的监听器。
我尝试了不同的kafka配置,但我相信情况并非如此。我还检查了 spring 云合同配置,但似乎一切正常。生成带有存根的适当罐子。不幸的是,Google 在这件事上帮不上什么忙。
如果您需要任何信息来帮助我,请随时询问。 我现在正在处理它好几天了,所以我很绝望,真的需要你的帮助。
编辑:添加堆栈跟踪和相关代码片段
堆栈跟踪:
java.lang.IllegalStateException: No consumer set up for topic [testSyncTopic]
at org.springframework.cloud.contract.verifier.messaging.kafka.Receiver.receive(KafkaStubMessages.java:110)
at org.springframework.cloud.contract.verifier.messaging.kafka.KafkaStubMessages.receive(KafkaStubMessages.java:80)
at org.springframework.cloud.contract.verifier.messaging.kafka.KafkaStubMessages.receive(KafkaStubMessages.java:42)
at com.comarch.fsm.dispatcher.rest.ContractBaseTest.setup(ContractBaseTest.groovy:56)
基础测试class配置:
@SpringBootTest
@EmbeddedKafka(bootstrapServersProperty = "spring.kafka.bootstrap-servers", brokerProperties = ["log.dir=target/embedded-kafka"])
@AutoConfigureStubRunner
abstract class BaseTestConfig extends Specification {
}
我的合约定义:
Pattern customDateTime() {
Pattern.compile('([0-9]{4})-(1[0-2]|0[1-9])-(0[1-9]|[12][0-9])T(2[0-3]|[01][0-9]):([0-5][0-9]):([0-5][0-9])Z')
}
Contract.make {
label("sync")
input {
triggeredBy("sync()")
}
outputMessage {
sentTo("testSyncTopic")
body(
syncStart: $(customDateTime())
)
}
}
ContractBaseTest class:
abstract class ContractBaseTest extends BaseTestConfig {
@Autowired
private KafkaService kafkaService;
def synchronizeData() {
kafkaService.sendKafkaMessage("testSyncTopic", null, new SyncDto(new Date()));
}
}
为什么你的基础测试 class 有 @AutoConfigureStubRunner
而它应该有 @AutoConfigureMessageVerifier
?看来你把消费者和生产者混为一谈了。
请在此处查看使用 Kafka 的生产者示例:https://github.com/spring-cloud-samples/spring-cloud-contract-samples/blob/master/producer_kafka。出于可读性原因,我将在此处复制粘贴它。
制作人
package com.example;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
// remove::start[]
import org.springframework.cloud.contract.verifier.messaging.boot.AutoConfigureMessageVerifier;
import org.springframework.kafka.test.context.EmbeddedKafka;
// remove::end[]
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE)
// remove::start[]
@AutoConfigureMessageVerifier
@EmbeddedKafka(partitions = 1, topics = {"topic1"})
// remove::end[]
@ActiveProfiles("test")
public abstract class BaseClass {
@Autowired
Controller controller;
public void trigger() {
this.controller.sendFoo("example");
}
}
在这里你可以找到控制器
package com.example;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;
import com.common.Foo1;
/**
* @author Gary Russell
* @since 2.2.1
*/
@RestController
public class Controller {
@Autowired
private KafkaTemplate<Object, Object> template;
@PostMapping(path = "/send/foo/{what}")
public void sendFoo(@PathVariable String what) {
this.template.send("topic1", new Foo1(what));
}
}
这里可以看到生产配置(application.yml
)
spring:
kafka:
producer:
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
logging.level.org.springframework.cloud.contract: debug
在这里你可以看到测试配置(application-test.yml
)
spring:
kafka:
bootstrap-servers: ${spring.embedded.kafka.brokers}
consumer:
properties:
"key.serializer": "org.springframework.kafka.support.serializer.JsonSerializer"
"key.deserializer": "org.springframework.kafka.support.serializer.JsonDeserializer"
group-id: groupId
import org.springframework.cloud.contract.spec.Contract
Contract.make {
label("trigger")
input {
triggeredBy("trigger()")
}
outputMessage {
sentTo("topic1")
body([
foo: "example"
])
}
}
消费者
现在是消费者时间 (https://github.com/spring-cloud-samples/spring-cloud-contract-samples/tree/master/consumer_kafka)
package com.example;
import org.assertj.core.api.BDDAssertions;
import org.awaitility.Awaitility;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
// remove::start[]
import org.springframework.cloud.contract.stubrunner.StubTrigger;
import org.springframework.cloud.contract.stubrunner.spring.AutoConfigureStubRunner;
import org.springframework.cloud.contract.stubrunner.spring.StubRunnerProperties;
import org.springframework.kafka.test.context.EmbeddedKafka;
// remove::end[]
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE)
// remove::start[]
@AutoConfigureStubRunner(ids = "com.example:beer-api-producer-kafka", stubsMode = StubRunnerProperties.StubsMode.LOCAL)
@EmbeddedKafka(topics = "topic1")
// remove::end[]
@ActiveProfiles("test")
public class ApplicationTests {
// remove::start[]
@Autowired
StubTrigger trigger;
@Autowired
Application application;
@Test
public void contextLoads() {
this.trigger.trigger("trigger");
Awaitility.await().untilAsserted(() -> {
BDDAssertions.then(this.application.storedFoo).isNotNull();
BDDAssertions.then(this.application.storedFoo.getFoo()).contains("example");
});
}
// remove::end[]
}