当从不同主题中具有相同键的主题加载 GlobalKTable 时会发生什么?
What happens when a GlobalKTable is loaded from a topic with the same key in different topics?
我们有一个只有一个分区的压缩主题,我们向它添加了一个新分区。
我们没有对现有数据进行重新分区 - 这意味着在添加新分区之前加载的事件仍在分区 0
中。并且新事件按照预期的标准策略存储:具有相同键的所有事件都存储在同一分区中。
我们目前遇到这样的情况:
Partition Offset Timestamp Key Value
0 586 1545388284240 COD_ISIN AAA
1 983 1551800369978 COD_ISIN BBB
1 1141 1556526044144 COD_ISIN CCC
当我在 GlobalKTable
中加载此主题时,商店中的值为 AAA
。我们显然期望 CCC
作为当前值。
GlobalKTable<String, JsonNode> storeDatacatalog = builder.globalTable(TOPIC, consumed, Materialized.as(STORE_DATACATALOG));
KStream<String, JsonNode> inEvent = builder.stream(OTHER_TOPIC, consumed);
inEvent = inEvent.transform(
new TransformerSupplier<String, JsonNode, KeyValue<String, JsonNode>>() {
@Override
public Transformer<String, JsonNode, KeyValue<String, JsonNode>> get() {
return new Transformer<String, JsonNode, KeyValue<String, JsonNode>>() {
private ProcessorContext context;
private KeyValueStore<String, JsonNode> dataCatalogueState;
@Override
public void init(ProcessorContext context) {
this.context = context;
this.dataCatalogueState = (KeyValueStore<String, JsonNode>) context.getStateStore(STORE_DATACATALOG);
LOGGER.debug("Content of dataCatalogueState: ");
KeyValueIterator<String, JsonNode> allDc = this.dataCatalogueState.all();
JsonNode valueForIsin = null;
while (allDc.hasNext()) {
try {
KeyValue<String, JsonNode> next = allDc.next();
LOGGER.debug(" | " + next.key + " : " + next.value);
if ("COD_ISIN".equals(next.key))
valueForIsin = next.value;
} catch (Exception e) {
LOGGER.debug("exc" , e.getMessage());
}
}
LOGGER.info(" COD_ISIN ---> " + valueForIsin);
}
@Override
public void close() {
}
@Override
public KeyValue<String, JsonNode> transform(String key, JsonNode value) {
return new KeyValue<>(key, value);
}
//@Override
public KeyValue<String, JsonNode> punctuate(long timestamp) {
// TODO Auto-generated method stub
return null;
}
};
}
}
)
GlobalKTable
是如何构建其状态的?是基于 Offset
还是基于 Timestamp
?
它是否在内部将密钥粘贴到找到密钥的第一个分区?
我知道如何解决(清除主题并再次填充 - 将应用分区策略)。但我很好奇它在内部是如何工作的。
GlobalKTable
假设数据是按键分区的。因此,如果您在不同分区中有具有相同键的记录,则无法保证将应用哪个订单记录。仅保证每个分区的顺序。除此之外,atm,更新仅基于分区内的偏移量。
使用上面的示例,顺序可以是
- AAA、BBB、CCC
- BBB、AAA、CCC
- BBB、CCC、AAA
只能保证,BBB 将在 CCC 之前应用。
我们有一个只有一个分区的压缩主题,我们向它添加了一个新分区。
我们没有对现有数据进行重新分区 - 这意味着在添加新分区之前加载的事件仍在分区 0
中。并且新事件按照预期的标准策略存储:具有相同键的所有事件都存储在同一分区中。
我们目前遇到这样的情况:
Partition Offset Timestamp Key Value
0 586 1545388284240 COD_ISIN AAA
1 983 1551800369978 COD_ISIN BBB
1 1141 1556526044144 COD_ISIN CCC
当我在 GlobalKTable
中加载此主题时,商店中的值为 AAA
。我们显然期望 CCC
作为当前值。
GlobalKTable<String, JsonNode> storeDatacatalog = builder.globalTable(TOPIC, consumed, Materialized.as(STORE_DATACATALOG));
KStream<String, JsonNode> inEvent = builder.stream(OTHER_TOPIC, consumed);
inEvent = inEvent.transform(
new TransformerSupplier<String, JsonNode, KeyValue<String, JsonNode>>() {
@Override
public Transformer<String, JsonNode, KeyValue<String, JsonNode>> get() {
return new Transformer<String, JsonNode, KeyValue<String, JsonNode>>() {
private ProcessorContext context;
private KeyValueStore<String, JsonNode> dataCatalogueState;
@Override
public void init(ProcessorContext context) {
this.context = context;
this.dataCatalogueState = (KeyValueStore<String, JsonNode>) context.getStateStore(STORE_DATACATALOG);
LOGGER.debug("Content of dataCatalogueState: ");
KeyValueIterator<String, JsonNode> allDc = this.dataCatalogueState.all();
JsonNode valueForIsin = null;
while (allDc.hasNext()) {
try {
KeyValue<String, JsonNode> next = allDc.next();
LOGGER.debug(" | " + next.key + " : " + next.value);
if ("COD_ISIN".equals(next.key))
valueForIsin = next.value;
} catch (Exception e) {
LOGGER.debug("exc" , e.getMessage());
}
}
LOGGER.info(" COD_ISIN ---> " + valueForIsin);
}
@Override
public void close() {
}
@Override
public KeyValue<String, JsonNode> transform(String key, JsonNode value) {
return new KeyValue<>(key, value);
}
//@Override
public KeyValue<String, JsonNode> punctuate(long timestamp) {
// TODO Auto-generated method stub
return null;
}
};
}
}
)
GlobalKTable
是如何构建其状态的?是基于 Offset
还是基于 Timestamp
?
它是否在内部将密钥粘贴到找到密钥的第一个分区?
我知道如何解决(清除主题并再次填充 - 将应用分区策略)。但我很好奇它在内部是如何工作的。
GlobalKTable
假设数据是按键分区的。因此,如果您在不同分区中有具有相同键的记录,则无法保证将应用哪个订单记录。仅保证每个分区的顺序。除此之外,atm,更新仅基于分区内的偏移量。
使用上面的示例,顺序可以是
- AAA、BBB、CCC
- BBB、AAA、CCC
- BBB、CCC、AAA
只能保证,BBB 将在 CCC 之前应用。