事件溯源 - Apache Kafka + Kafka Streams - 如何确保原子性/事务性

Event Sourcing - Apache Kafka + Kafka Streams - How to assure atomicity / transactionality

我正在使用 Apache Kafka Streams 评估事件溯源,以了解它在复杂场景中的可行性。与关系数据库一样,我遇到过一些情况 atomicity/transactionality 是必不可少的:

具有两项服务的购物应用程序:

流量:

  1. OrderService 发布一个 OrderCreated 事件(带有 productId、orderId、userId 信息)

  2. ProductService 获取 OrderCreated 事件并查询其 KafkaStreams Store (ProductStockStore) 以检查产品是否有库存。如果有库存,它会发布一个 OrderUpdated 事件(还有 productId、orderId、userId 信息)

关键是这个事件将被 ProductService Kafka Stream 监听,它会处理它以减少库存,到目前为止一切顺利。

但是,想象一下:

  1. 客户1下单,order1(该商品有库存1)
  2. 客户 2 同时为同一产品下了另一个订单 order2(库存仍为 1)
  3. ProductService 处理 order1 并发送消息 OrderUpdated 以减少库存。此消息放在主题中来自 order2 -> OrderCreated
  4. 的消息之后
  5. ProductService 处理 order2-OrderCreated 并发送消息 OrderUpdated 再次减少库存。这是不正确的,因为它会引入不一致(库存现在应该是 0)。

明显的问题是我们的物化视图(商店)应该在我们处理第一个 OrderUpdated 事件时直接更新。然而,更新 Kafka Stream Store 的唯一方法(我知道)是发布另一个事件(OrderUpdated)以由 Kafka Stream 处理。这样我们就无法以事务方式执行此更新。

我会很感激处理这种情况的想法。

更新:我会尝试澄清问题的部分:

ProductService 有一个 Kafka Streams 商店,ProductStock 有这个库存 (productId=1, quantity=1)

OrderServiceorders 主题:

上发布了两个 OrderPlaced 事件

ProductService 在订单主题上有一个消费者。为简单起见,我们假设 单个分区 以确保按顺序消费消息。该消费者执行以下逻辑:

if("OrderPlaced".equals(event.get("eventType"))){

    Order order = new Order();
    order.setId((String)event.get("orderId"));
    order.setProductId((Integer)(event.get("productId")));
    order.setUid(event.get("uid").toString());

    // QUERY PRODUCTSTOCK TO CHECK AVAILABILITY
    Integer productStock = getProductStock(order.getProductId());

    if(productStock > 0) {
        Map<String, Object> event = new HashMap<>();
        event.put("name", "ProductReserved");
        event.put("orderId", order.getId());
        event.put("productId", order.getProductId());

        // WRITES A PRODUCT RESERVED EVENT TO orders topic
        orderProcessor.output().send(MessageBuilder.withPayload(event).build(), 500);
    }else{
        //XXX CANCEL ORDER
    }
}

ProductService还有一个负责更新库存的Kafka Streams处理器:

KStream<Integer, JsonNode> stream = kStreamBuilder.stream(integerSerde, jsonSerde, "orders");
stream.xxx().yyy(() -> {...}, "ProductsStock");

Event1 将首先处理,因为仍有 1 个可用产品,它将生成 ProductReserved 事件。

现在,轮到 Event2 了。如果它在 ProductService 消费者 之前被 ProductService Kafka Streams Processor 处理由 Event1 生成的 ProductReseved 事件,消费者仍然会看到 product1 的 ProductStore 库存为 1,为 Event2 生成 ProductReserved 事件,然后在系统中产生不一致。

同样的问题在确保任何分布式系统的一致性方面都很典型。通常不使用强一致性,而是使用过程 manager/saga 模式。这有点类似于分布式事务中的两阶段提交,但在应用程序代码中明确实现。它是这样的:

订单服务要求产品服务保留 N 件商品。产品服务要么接受命令并减少库存,要么在没有足够的可用商品时拒绝该命令。在对命令作出肯定答复后,订单服务现在可以发出 OrderCreated 事件(尽管我将其称为 OrderPlaced,因为 "placed" 听起来是域惯用的模式,而 "created" 更通用,但这是一个细节) . Product Service 要么监听 OrderPlaced 事件,要么向它发送明确的 ConfirmResevation 命令。或者,如果发生其他事情(例如未能清算资金),则可以发出适当的事件或将 CancelReservation 命令显式发送到 ProductService。为了应对特殊情况,ProductService 也可能有一个调度程序(在 KafkaStreams 中标点符号可以派上用场)来取消在超时期限内未确认或中止的预订。

两个服务的编排技术细节以及处理错误条件和补偿操作(在这种情况下取​​消预订)可以直接在服务中处理,或者在显式的流程管理器组件中处理以分离此责任。就我个人而言,我会选择一个可以使用 Kafka Streams Processor API.

实现的显式流程管理器

这个答案对于你原来的问题来说有点晚了,但为了完整性还是让我回答一下。

有多种方法可以解决这个问题,但我鼓励以事件驱动的方式来解决这个问题。这意味着您 (a) 验证是否有足够的库存来处理订单,以及 (b) 将库存作为单个库存保留,所有这些都在单个 KStreams 操作中完成。诀窍是通过 productId 重新生成密钥,这样您就知道同一产品的订单将在同一线程上按顺序执行(因此您不会陷入 Order1 和 Order2 两次保留同一产品库存的情况)。

有一个 post 会谈讨论如何做到这一点:https://www.confluent.io/blog/building-a-microservices-ecosystem-with-kafka-streams-and-ksql/

也许更有用的是一些示例代码也显示了如何完成: https://github.com/confluentinc/kafka-streams-examples/blob/1cbcaddd85457b39ee6e9050164dc619b08e9e7d/src/main/java/io/confluent/examples/streams/microservices/InventoryService.java#L76

请注意,在此 KStreams 代码中,第一行如何重新生成 productId,然后使用 Transformer (a) 验证是否有足够的库存来处理订单 (b)通过更新状态存储来保留所需的库存。这是使用 Kafka 的事务功能以原子方式完成的。