当从不同主题中具有相同键的主题加载 GlobalKTable 时会发生什么?

What happens when a GlobalKTable is loaded from a topic with the same key in different topics?

我们有一个只有一个分区的压缩主题,我们向它添加了一个新分区。

我们没有对现有数据进行重新分区 - 这意味着在添加新分区之前加载的事件仍在分区 0 中。并且新事件按照预期的标准策略存储:具有相同键的所有事件都存储在同一分区中。

我们目前遇到这样的情况:

Partition    Offset    Timestamp      Key         Value
0            586       1545388284240  COD_ISIN    AAA
1            983       1551800369978  COD_ISIN    BBB
1            1141      1556526044144  COD_ISIN    CCC

当我在 GlobalKTable 中加载此主题时,商店中的值为 AAA。我们显然期望 CCC 作为当前值。

GlobalKTable<String, JsonNode> storeDatacatalog = builder.globalTable(TOPIC, consumed,  Materialized.as(STORE_DATACATALOG));

KStream<String, JsonNode> inEvent = builder.stream(OTHER_TOPIC, consumed);

inEvent = inEvent.transform(
    new TransformerSupplier<String, JsonNode, KeyValue<String, JsonNode>>() {

        @Override
        public Transformer<String, JsonNode, KeyValue<String, JsonNode>> get() {

            return new Transformer<String, JsonNode, KeyValue<String, JsonNode>>() {

                private ProcessorContext context;
                private KeyValueStore<String, JsonNode> dataCatalogueState;

                @Override
                public void init(ProcessorContext context) {

                    this.context = context;
                    this.dataCatalogueState = (KeyValueStore<String, JsonNode>) context.getStateStore(STORE_DATACATALOG);

                    LOGGER.debug("Content of dataCatalogueState: ");
                    KeyValueIterator<String, JsonNode> allDc = this.dataCatalogueState.all();

                    JsonNode valueForIsin = null;

                    while (allDc.hasNext()) {
                        try {
                            KeyValue<String, JsonNode> next = allDc.next();
                            LOGGER.debug(" | " + next.key + " : " + next.value);
                            if ("COD_ISIN".equals(next.key)) 
                                valueForIsin = next.value;
                        } catch (Exception e) {
                            LOGGER.debug("exc" , e.getMessage());
                        }
                    }
                    LOGGER.info(" COD_ISIN ---> " + valueForIsin);
                }

                @Override
                public void close() {
                }

                @Override
                public KeyValue<String, JsonNode> transform(String key, JsonNode value) {
                    return new KeyValue<>(key, value);
                }

                //@Override
                public KeyValue<String, JsonNode> punctuate(long timestamp) {
                    // TODO Auto-generated method stub
                    return null;
                }
            };
        }
    }
)

GlobalKTable 是如何构建其状态的?是基于 Offset 还是基于 Timestamp? 它是否在内部将密钥粘贴到找到密钥的第一个分区?

我知道如何解决(清除主题并再次填充 - 将应用分区策略)。但我很好奇它在内部是如何工作的。

GlobalKTable 假设数据是按键分区的。因此,如果您在不同分区中有具有相同键的记录,则无法保证将应用哪个订单记录。仅保证每个分区的顺序。除此之外,atm,更新仅基于分区内的偏移量。

使用上面的示例,顺序可以是

  • AAA、BBB、CCC
  • BBB、AAA、CCC
  • BBB、CCC、AAA

只能保证,BBB 将在 CCC 之前应用。