Spring Cloud - Kafka Streams Binder - 测试 Kafka 流处理器
Sprint Cloud - Kafka Streams Binder - Testing KafkaStreamsProcessor
我一直在尝试为 KafkaStreamsProcessor 编写单元测试。这是代码处理器代码
@EnableBinding(KafkaStreamsProcessor.class)
public class StockProcessor {
private static final Log LOG = LogFactory.getLog(StockProcessor.class);
@Autowired
private EddieClient client;
@Autowired
private InventoryRepository inventoryRepository;
@Autowired
private PermissionRepository permissionRepository;
/**
* Receive message from input queue
* Apply business logic
* Send to output queue
*
* @param inputMessage the message
* @return outputMessage
*/
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public KStream<?, OutputMessage> process(KStream<?, InputMessage> inputMessage){
return inputMessage
.map((key, value) -> {
LOG.info("::: processing message...");
// ... business logic
return new KeyValue<>(key, outputMessage);
});
}
}
aplication.yml
spring:
cloud:
stream:
kafka:
streams:
binder:
brokers:
- ${NX_KAFKA_SERVERS}
bindings:
input:
destination: ${NX_INPUT_TOPIC}
content-type: application/json
group: ${NX_PULL_GROUP_ID}
output:
destination: ${NX_OUTPUT_TOPIC}
content-type: application/json
group: ${NX_PUSH_GROUP_ID}
这是我在单元测试中阅读并尝试做的内容
public class StockProcessorTest {
private static final String INPUT_TOPIC = "input-topic";
private static final String OUTPUT_TOPIC = "output-topic";
@SpyBean
private StockProcessor stockProcessor;
@MockBean
private EddieClient client;
@MockBean
private InventoryRepository inventoryRepository;
@MockBean
private PermissionRepository permissionRepository;
private TopologyTestDriver topologyTestDriver;
private TestInputTopic<String, InputMessage> inputTopic;
private TestOutputTopic<String, OutputMessage> outputTopic;
private Topology topology;
private Properties config;
KStream<String, InputMessage> inputMessageStream;
KStream<String, OutputMessage> outputMessageStream;
@Before
public void setup() {
config = new Properties();
config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "app_id");
config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "foo:1234");
StreamsBuilder streamsBuilder = new StreamsBuilder();
inputMessageStream = streamsBuilder.stream(INPUT_TOPIC);
stockProcessor.process(inputMessageStream).to(OUTPUT_TOPIC);
topology = streamsBuilder.build();
topologyTestDriver = new TopologyTestDriver(topology, config);
//???
}
}
我真的不知道我在这里是否走对了路。我正在使用杰克逊序列化程序。如何创建 inputTopic 和 outputTopic 并测试我的业务逻辑?
我可以提供任何需要的进一步细节。
提前致谢
这里有一些示例,说明如何基于 Spring Cloud Stream - https://github.com/spring-cloud/spring-cloud-stream-samples/blob/master/kafka-streams-samples/kafka-streams-word-count/src/test/java/kafka/streams/word/count/WordCountProcessorApplicationTests.java
对 Kafka Streams 应用程序进行单元测试
此外,这是一个包含一些高级示例的测试套件:https://github.com/spring-cloud/spring-cloud-stream-samples/tree/master/kafka-streams-samples/kafka-streams-inventory-count/src/test/java/kafka/streams/inventory/count
这些示例包含有关如何将 Serdes 与测试驱动程序一起使用的详细信息。
请检查它们是否满足您的测试要求。
我一直在尝试为 KafkaStreamsProcessor 编写单元测试。这是代码处理器代码
@EnableBinding(KafkaStreamsProcessor.class)
public class StockProcessor {
private static final Log LOG = LogFactory.getLog(StockProcessor.class);
@Autowired
private EddieClient client;
@Autowired
private InventoryRepository inventoryRepository;
@Autowired
private PermissionRepository permissionRepository;
/**
* Receive message from input queue
* Apply business logic
* Send to output queue
*
* @param inputMessage the message
* @return outputMessage
*/
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public KStream<?, OutputMessage> process(KStream<?, InputMessage> inputMessage){
return inputMessage
.map((key, value) -> {
LOG.info("::: processing message...");
// ... business logic
return new KeyValue<>(key, outputMessage);
});
}
}
aplication.yml
spring:
cloud:
stream:
kafka:
streams:
binder:
brokers:
- ${NX_KAFKA_SERVERS}
bindings:
input:
destination: ${NX_INPUT_TOPIC}
content-type: application/json
group: ${NX_PULL_GROUP_ID}
output:
destination: ${NX_OUTPUT_TOPIC}
content-type: application/json
group: ${NX_PUSH_GROUP_ID}
这是我在单元测试中阅读并尝试做的内容
public class StockProcessorTest {
private static final String INPUT_TOPIC = "input-topic";
private static final String OUTPUT_TOPIC = "output-topic";
@SpyBean
private StockProcessor stockProcessor;
@MockBean
private EddieClient client;
@MockBean
private InventoryRepository inventoryRepository;
@MockBean
private PermissionRepository permissionRepository;
private TopologyTestDriver topologyTestDriver;
private TestInputTopic<String, InputMessage> inputTopic;
private TestOutputTopic<String, OutputMessage> outputTopic;
private Topology topology;
private Properties config;
KStream<String, InputMessage> inputMessageStream;
KStream<String, OutputMessage> outputMessageStream;
@Before
public void setup() {
config = new Properties();
config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "app_id");
config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "foo:1234");
StreamsBuilder streamsBuilder = new StreamsBuilder();
inputMessageStream = streamsBuilder.stream(INPUT_TOPIC);
stockProcessor.process(inputMessageStream).to(OUTPUT_TOPIC);
topology = streamsBuilder.build();
topologyTestDriver = new TopologyTestDriver(topology, config);
//???
}
}
我真的不知道我在这里是否走对了路。我正在使用杰克逊序列化程序。如何创建 inputTopic 和 outputTopic 并测试我的业务逻辑?
我可以提供任何需要的进一步细节。 提前致谢
这里有一些示例,说明如何基于 Spring Cloud Stream - https://github.com/spring-cloud/spring-cloud-stream-samples/blob/master/kafka-streams-samples/kafka-streams-word-count/src/test/java/kafka/streams/word/count/WordCountProcessorApplicationTests.java
对 Kafka Streams 应用程序进行单元测试此外,这是一个包含一些高级示例的测试套件:https://github.com/spring-cloud/spring-cloud-stream-samples/tree/master/kafka-streams-samples/kafka-streams-inventory-count/src/test/java/kafka/streams/inventory/count
这些示例包含有关如何将 Serdes 与测试驱动程序一起使用的详细信息。
请检查它们是否满足您的测试要求。