从多个 Kafka 主题中消费

Consuming from multiple Kafka topics

我想编写一个 Kafka 应用程序,它使用主题并将某些内容保存在数据库中。主题由 Debezium Kafka connect 基于 mysql binlog 创建。所以我每个 table 有一个主题。 这是我用于从一个主题中消费的代码:

KStream<GenericRecord,mysql.company.tiers.Envelope>[] tierStream = builder.stream("mysql.alopeyk.tiers",
                Consumed.with(TierSerde.getGenericKeySerde(), TierSerde.getEnvelopeSerde()));

从架构的角度来看,我应该为每个 table 和 运行 并行创建一个 KStream。但是 table 的数量太大了,拥有那么多的线程可能不是最好的选择。

所有 tables 都有一个名为 created_at 的列(它是一个 laravel 应用程序)所以我很好奇是否有一种方法可以为值提供一个通用的 Serde提取此公共列。这是除了 table.

的名称之外我唯一对其值感兴趣的列

这完全是关于您的值如何被生成消息的应用程序(连接器)序列化的。 如果 Deserializer (Serdes) 可以从不同类型的消息中提取 created_at 是可能的。

所以,答案是肯定的,但这取决于您的消息价值 nad Deserializer

假设序列化后的所有消息格式如下:

  • create_at;姓名:职位;...
  • create_at;城市,国家;...
  • create_at;product_name;...

在这种情况下 Deserializer 只需要将字符取到第一个 ; 并将其转换为日期,其余值可以删除。

示例代码:

public class CustomDeserializer implements Deserializer<Date> {

    @Override
    public Date deserialize(String topic, byte[] data) {
        String strDate = new String(data);
        return new Date(Long.parseLong(strDate.substring(0, strDate.indexOf(";"))));
    }
}