从多个 Kafka 主题中消费
Consuming from multiple Kafka topics
我想编写一个 Kafka 应用程序,它使用主题并将某些内容保存在数据库中。主题由 Debezium Kafka connect 基于 mysql binlog 创建。所以我每个 table 有一个主题。
这是我用于从一个主题中消费的代码:
KStream<GenericRecord,mysql.company.tiers.Envelope>[] tierStream = builder.stream("mysql.alopeyk.tiers",
Consumed.with(TierSerde.getGenericKeySerde(), TierSerde.getEnvelopeSerde()));
从架构的角度来看,我应该为每个 table 和 运行 并行创建一个 KStream。但是 table 的数量太大了,拥有那么多的线程可能不是最好的选择。
所有 tables 都有一个名为 created_at 的列(它是一个 laravel 应用程序)所以我很好奇是否有一种方法可以为值提供一个通用的 Serde提取此公共列。这是除了 table.
的名称之外我唯一对其值感兴趣的列
这完全是关于您的值如何被生成消息的应用程序(连接器)序列化的。
如果 Deserializer
(Serdes
) 可以从不同类型的消息中提取 created_at
是可能的。
所以,答案是肯定的,但这取决于您的消息价值 nad Deserializer
。
假设序列化后的所有消息格式如下:
- create_at;姓名:职位;...
- create_at;城市,国家;...
- create_at;product_name;...
在这种情况下 Deserializer
只需要将字符取到第一个 ;
并将其转换为日期,其余值可以删除。
示例代码:
public class CustomDeserializer implements Deserializer<Date> {
@Override
public Date deserialize(String topic, byte[] data) {
String strDate = new String(data);
return new Date(Long.parseLong(strDate.substring(0, strDate.indexOf(";"))));
}
}
我想编写一个 Kafka 应用程序,它使用主题并将某些内容保存在数据库中。主题由 Debezium Kafka connect 基于 mysql binlog 创建。所以我每个 table 有一个主题。 这是我用于从一个主题中消费的代码:
KStream<GenericRecord,mysql.company.tiers.Envelope>[] tierStream = builder.stream("mysql.alopeyk.tiers",
Consumed.with(TierSerde.getGenericKeySerde(), TierSerde.getEnvelopeSerde()));
从架构的角度来看,我应该为每个 table 和 运行 并行创建一个 KStream。但是 table 的数量太大了,拥有那么多的线程可能不是最好的选择。
所有 tables 都有一个名为 created_at 的列(它是一个 laravel 应用程序)所以我很好奇是否有一种方法可以为值提供一个通用的 Serde提取此公共列。这是除了 table.
的名称之外我唯一对其值感兴趣的列这完全是关于您的值如何被生成消息的应用程序(连接器)序列化的。
如果 Deserializer
(Serdes
) 可以从不同类型的消息中提取 created_at
是可能的。
所以,答案是肯定的,但这取决于您的消息价值 nad Deserializer
。
假设序列化后的所有消息格式如下:
- create_at;姓名:职位;...
- create_at;城市,国家;...
- create_at;product_name;...
在这种情况下 Deserializer
只需要将字符取到第一个 ;
并将其转换为日期,其余值可以删除。
示例代码:
public class CustomDeserializer implements Deserializer<Date> {
@Override
public Date deserialize(String topic, byte[] data) {
String strDate = new String(data);
return new Date(Long.parseLong(strDate.substring(0, strDate.indexOf(";"))));
}
}