Kafka 流:从应用程序的每个实例中的所有分区读取
Kafka streams: Read from ALL partitions in every instance of an application
使用 KTable 时,当实例/消费者的数量等于分区的数量时,Kafka 流不允许实例从特定主题的多个分区读取数据。我尝试使用 GlobalKTable 实现此目的,问题是数据将被覆盖,并且无法对其应用聚合。
假设我有一个名为 "data_in" 的主题,有 3 个分区(P1、P2、P3)。当我 运行 Kafka 流应用程序的 3 个实例(I1、I2、I3)时,我希望每个实例都从 "data_in" 的所有分区读取数据。我的意思是 I1 可以从 P1、P2 和 P3 读取,I2 可以从 P1、P2 和 P3 读取,I2 等等。
编辑:请记住,生产者可以将两个相似的 ID 发布到 "data_in" 中的两个不同分区中。所以当运行宁两个不同的实例时,GlobalKtable将被覆盖。
请问如何实现?这是我的代码的一部分
private KTable<String, theDataList> globalStream() {
// KStream of records from data-in topic using String and theDataSerde deserializers
KStream<String, Data> trashStream = getBuilder().stream("data_in",Consumed.with(Serdes.String(), SerDes.theDataSerde));
// Apply an aggregation operation on the original KStream records using an intermediate representation of a KStream (KGroupedStream)
KGroupedStream<String, Data> KGS = trashStream.groupByKey();
Materialized<String, theDataList, KeyValueStore<Bytes, byte[]>> materialized = Materialized.as("agg-stream-store");
materialized = materialized.withValueSerde(SerDes.theDataDataListSerde);
// Return a KTable
return KGS.aggregate(() -> new theDataList(), (key, value, aggregate) -> {
if (!value.getValideData())
aggregate.getList().removeIf((t) -> t.getTimestamp() <= value.getTimestamp());
else
aggregate.getList().add(value);
return aggregate;
}, materialized);
}
要么将输入主题 "data_in" 的分区数更改为 1 个分区,要么使用 GlobalKtable
从主题中的所有分区获取数据,然后您就可以使用它加入流。这样一来,您的应用程序实例不再需要位于不同的消费者组中。
代码将如下所示:
private GlobalKTable<String, theDataList> globalStream() {
// KStream of records from data-in topic using String and theDataSerde deserializers
KStream<String, Data> trashStream = getBuilder().stream("data_in", Consumed.with(Serdes.String(), SerDes.theDataSerde));
thrashStream.to("new_data_in"); // by sending to an other topic you're forcing a repartition on that topic
KStream<String, Data> newTrashStream = getBuilder().stream("new_data_in", Consumed.with(Serdes.String(), SerDes.theDataSerde));
// Apply an aggregation operation on the original KStream records using an intermediate representation of a KStream (KGroupedStream)
KGroupedStream<String, Data> KGS = newTrashStream.groupByKey();
Materialized<String, theDataList, KeyValueStore<Bytes, byte[]>> materialized = Materialized.as("agg-stream-store");
materialized = materialized.withValueSerde(SerDes.theDataDataListSerde);
// Return a KTable
KGS.aggregate(() -> new theDataList(), (key, value, aggregate) -> {
if (!value.getValideData())
aggregate.getList().removeIf((t) -> t.getTimestamp() <= value.getTimestamp());
else
aggregate.getList().add(value);
return aggregate;
}, materialized)
.to("agg_data_in");
return getBuilder().globalTable("agg_data_in");
}
编辑:我编辑了上面的代码以强制对名为 "new_data_in".
的主题进行重新分区
使用 KTable 时,当实例/消费者的数量等于分区的数量时,Kafka 流不允许实例从特定主题的多个分区读取数据。我尝试使用 GlobalKTable 实现此目的,问题是数据将被覆盖,并且无法对其应用聚合。
假设我有一个名为 "data_in" 的主题,有 3 个分区(P1、P2、P3)。当我 运行 Kafka 流应用程序的 3 个实例(I1、I2、I3)时,我希望每个实例都从 "data_in" 的所有分区读取数据。我的意思是 I1 可以从 P1、P2 和 P3 读取,I2 可以从 P1、P2 和 P3 读取,I2 等等。
编辑:请记住,生产者可以将两个相似的 ID 发布到 "data_in" 中的两个不同分区中。所以当运行宁两个不同的实例时,GlobalKtable将被覆盖。
请问如何实现?这是我的代码的一部分
private KTable<String, theDataList> globalStream() {
// KStream of records from data-in topic using String and theDataSerde deserializers
KStream<String, Data> trashStream = getBuilder().stream("data_in",Consumed.with(Serdes.String(), SerDes.theDataSerde));
// Apply an aggregation operation on the original KStream records using an intermediate representation of a KStream (KGroupedStream)
KGroupedStream<String, Data> KGS = trashStream.groupByKey();
Materialized<String, theDataList, KeyValueStore<Bytes, byte[]>> materialized = Materialized.as("agg-stream-store");
materialized = materialized.withValueSerde(SerDes.theDataDataListSerde);
// Return a KTable
return KGS.aggregate(() -> new theDataList(), (key, value, aggregate) -> {
if (!value.getValideData())
aggregate.getList().removeIf((t) -> t.getTimestamp() <= value.getTimestamp());
else
aggregate.getList().add(value);
return aggregate;
}, materialized);
}
要么将输入主题 "data_in" 的分区数更改为 1 个分区,要么使用 GlobalKtable
从主题中的所有分区获取数据,然后您就可以使用它加入流。这样一来,您的应用程序实例不再需要位于不同的消费者组中。
代码将如下所示:
private GlobalKTable<String, theDataList> globalStream() {
// KStream of records from data-in topic using String and theDataSerde deserializers
KStream<String, Data> trashStream = getBuilder().stream("data_in", Consumed.with(Serdes.String(), SerDes.theDataSerde));
thrashStream.to("new_data_in"); // by sending to an other topic you're forcing a repartition on that topic
KStream<String, Data> newTrashStream = getBuilder().stream("new_data_in", Consumed.with(Serdes.String(), SerDes.theDataSerde));
// Apply an aggregation operation on the original KStream records using an intermediate representation of a KStream (KGroupedStream)
KGroupedStream<String, Data> KGS = newTrashStream.groupByKey();
Materialized<String, theDataList, KeyValueStore<Bytes, byte[]>> materialized = Materialized.as("agg-stream-store");
materialized = materialized.withValueSerde(SerDes.theDataDataListSerde);
// Return a KTable
KGS.aggregate(() -> new theDataList(), (key, value, aggregate) -> {
if (!value.getValideData())
aggregate.getList().removeIf((t) -> t.getTimestamp() <= value.getTimestamp());
else
aggregate.getList().add(value);
return aggregate;
}, materialized)
.to("agg_data_in");
return getBuilder().globalTable("agg_data_in");
}
编辑:我编辑了上面的代码以强制对名为 "new_data_in".
的主题进行重新分区