加入包含 Java 个哈希映射对象的 Kafka 流

Joining Kafka Streams containing Java Hash Map Objects

目前我正在构建数据管道。我正在从 sql 数据库 2 tables 中读取,在使用 Kafka 流将它们加入流后,我必须将它们以非规范化格式存储在 OLAP 数据仓库中。

我没有为每个 table 单独的主题,而是让两个 table 将数据插入到一个主题。

我正在将行转换为哈希图,然后使用字节序列化程序将此信息转换为字节数组并推送到主题,因此一行中的所有信息都存储在一个对象中。其代码是:

ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutput out = null;
byte[] yourBytes = null;
try {
     out = new ObjectOutputStream(bos);
     out.writeObject(record);
     // here record is the row hashmap
     out.flush();
     yourBytes = bos.toByteArray();
}
catch (IOException ex) {
    // ignore close exception
}

在流处理应用程序中,我将字节数组反序列化回 hashmap,并将记录过滤成两个单独的流,每个流对应一个 table。

因此,在将字节数组反序列化回 hashmap 对象后,我在处理阶段的记录如下所示,其中与每个 table 相关的每个流的一条记录如下所示:

(key,{meta = "PRODUCTS",PRODUCTNAME=ONE, ISACTIVE=1, METATABLENAME=PRODUCT, PRODUCTSUBCATEGORYID=16, PRODUCTID=57})

(key,{meta = "BRAND", BRANDNAME="ABC", BRANDID=16, PRODUCTID=57, BRANDCATEGORY = "Electronics"})

现在我必须将数据加入两个流中,每个值都是一个散列映射,并加入键 PRODUCTID,这是 [=33] 的公共字段=]s,最后为每一行生成一个 hashmap,并将该流推送到一个主题。

所以加入的记录将如下所示:

(key,{meta = "JOINEDTABLE",PRODUCTNAME=ONE, ISACTIVE=1, METATABLENAME=PRODUCT, PRODUCTSUBCATEGORYID=16, BRANDNAME="ABC", BRANDID=16, PRODUCTID=57,BRANDCATEGORY = "Electronics"})

是否可以使用 Kafka 流来执行此操作,如果可以,那么如何?

如果你想在Kafka Streams中加入,你需要提取加入属性并将其设置为消息的键:

KStream streamOfTable1 = ...
streamOfTable1.selectKey(/*extract productId and set as key*/).to("newTopic1");

KStream streamOfTable2 = ...
streamOfTable2.selectKey(/*extract productId and set as key*/).to("newTopic2");

KTable table1 = builder.table("newTopic1");
KTable table2 = builder.table("newTopic2");

table1.join(table2, ...).to("resultTopic");

有关更多详细信息,请参阅文档:http://docs.confluent.io/current/streams/developer-guide.html#joining

我假设您需要 KTable-KTable 连接。请注意,您需要手动创建 "newTopic1" 和 "newTopic2",并且两者都需要具有相同数量的分区。 (比照http://docs.confluent.io/current/streams/developer-guide.html#user-topics

同时检查其他可用的连接类型,以防 KTable-KTable 连接不是您想要的。