为什么 Kafka Streams 没有 @StreamListener 就不能工作?
Why Kafka Streams don't work without @StreamListener?
@StreamListener 现在已被弃用,所以我尝试以功能方式重写我的消费者。
Java代码
@Component
public class MyConsumer {
public void handleMyMethod(MessageEntity message) {
...
}
}
@RequiredArgsConstructor
@Configuration
public class MyConsumerConfiguration {
private final MyConsumer myConsumer;
@Bean
public Consumer<MessageEntity> myMethod() {
return myConsumer::handleMyMethod;
}
}
application.yaml
spring.cloud :
stream :
kafka.binder.brokers :
- "127.0.0.1:9092"
function.definition : myMethod
bindings :
myMethod-in-0 :
destination : service.method.update
测试
@Test
void handleMyMethod() {
MessageEntity message = new MessageEntity();
template.send("service.method.update", message);
await()
.atMost(6, TimeUnit.SECONDS)
.untilAsserted(() -> {
verify(myConsumer, atLeastOnce()).handleMyMethod(entityCaptor.capture());
MessageEntity entity = entityCaptor.getValue();
assertNotNull(entity);
});
}
运行 测试。消息传到 Kafka(我在 Kafka 工具中看到它),但 MyConsumer 没有捕捉到它。测试失败并出现错误:
myConsumer.handleMethod(
<Capturing argument>
);
-> at com.my.project.MyConsumer.handleMethod(MyConsumer.java:33)
Actually, there were zero interactions with this mock.```
我发现了问题。我的 util 模块,集成在我上面描述的模块中,有自己的“function.definition”属性。由于上面的模块在 util 模块之前进行配置,因此 util 模块会从顶级模块中删除方法。我是这样解决问题的:
@Autowired
StreamFunctionProperties streamFunctionProperties;
...
String definitionString = StringUtils.hasText(streamFunctionProperties.getDefinition()) ?
streamFunctionProperties.getDefinition() + ";" :
"";
streamFunctionProperties.setDefinition(definitionString + "methodDeleteOut;methodUpdateOut;roleUpdateIn;roleDeleteIn");
@StreamListener 现在已被弃用,所以我尝试以功能方式重写我的消费者。
Java代码
@Component
public class MyConsumer {
public void handleMyMethod(MessageEntity message) {
...
}
}
@RequiredArgsConstructor
@Configuration
public class MyConsumerConfiguration {
private final MyConsumer myConsumer;
@Bean
public Consumer<MessageEntity> myMethod() {
return myConsumer::handleMyMethod;
}
}
application.yaml
spring.cloud :
stream :
kafka.binder.brokers :
- "127.0.0.1:9092"
function.definition : myMethod
bindings :
myMethod-in-0 :
destination : service.method.update
测试
@Test
void handleMyMethod() {
MessageEntity message = new MessageEntity();
template.send("service.method.update", message);
await()
.atMost(6, TimeUnit.SECONDS)
.untilAsserted(() -> {
verify(myConsumer, atLeastOnce()).handleMyMethod(entityCaptor.capture());
MessageEntity entity = entityCaptor.getValue();
assertNotNull(entity);
});
}
运行 测试。消息传到 Kafka(我在 Kafka 工具中看到它),但 MyConsumer 没有捕捉到它。测试失败并出现错误:
myConsumer.handleMethod(
<Capturing argument>
);
-> at com.my.project.MyConsumer.handleMethod(MyConsumer.java:33)
Actually, there were zero interactions with this mock.```
我发现了问题。我的 util 模块,集成在我上面描述的模块中,有自己的“function.definition”属性。由于上面的模块在 util 模块之前进行配置,因此 util 模块会从顶级模块中删除方法。我是这样解决问题的:
@Autowired
StreamFunctionProperties streamFunctionProperties;
...
String definitionString = StringUtils.hasText(streamFunctionProperties.getDefinition()) ?
streamFunctionProperties.getDefinition() + ";" :
"";
streamFunctionProperties.setDefinition(definitionString + "methodDeleteOut;methodUpdateOut;roleUpdateIn;roleDeleteIn");