如何将记录拆分为不同的流,从一个主题到不同的流?

How to split records into different streams, from one topic to different streams?

我有一个包含不同大小记录的单一源 CSV 文件,将每条记录推送到一个源主题中。我想将记录拆分为来自该源主题的不同 KStreams/KTables。我有一个 table 负载的管道,我将源主题中的记录以分隔格式推送到 stream1,然后将记录推送到另一个 AVRO 格式的流,然后将其推送到 JDBC 接收器将记录推送到 MySQL 数据库的连接器。管道需要相同。但是我想将不同 table 的记录推送到一个源主题中,然后根据一个值将记录拆分到不同的流中。这可能吗?我尝试寻找方法来做到这一点,但没有成功。我也可以以某种方式改进管道或使用 KTable 代替 KStreams 或任何其他修改吗?

我目前的流量- 一个源 CSV 文件 (source.csv) -> 源主题(名称 - 包含 test1 记录的源主题) -> 流 1(定界值格式) -> 流 2(作为 AVRO 值格式) -> 结束主题(名称 - sink-db-test1) -> JDBC 接收器连接器 -> MySQL DB (name - test1)

我有一个不同的 MySQL table test2,它的架构不同,table 的记录也存在于 source.csv 文件中。由于架构不同,我无法按照 test1 的当前管道将数据插入 test2 table.

示例 - 在 CSV 源文件中,

line 1 - 9,atm,mun,ronaldo line 2- 10,atm,mun,bravo,num2 line 3 - 11,atm,sign,bravo,sick

在此示例中,要拆分的值是 column 4ronaldobravo) 所有这些数据应分别加载到 table 1table 2table 3 关键是第 4 列。

if col4==ronaldo, go to table 1 if col4==bravo and col3==mun, go to table 2 if col4==bravo and col3 ==sign go to table 3

我是 Kafka 的新手,从上周开始进行 Kafka 开发。

您可以编写一个单独的 Kafka Streams 应用程序,使用 KStream#branch() operator:

将记录从输入主题拆分到不同的 KStream 或输出主题
KStream<K, V>[] branches = streamsBuilder.branch(
        (key, value) -> {filter logic for topic 1 here},
        (key, value) -> {filter logic for topic 2 here},
        (key, value) -> true//get all messages for this branch
);

// KStream branches[0] records for logic 1
// KStream branches[1] records for logic 2
// KStream branches[2] records for logic 3

或者您可以像这样手动分支您的 KStream:

KStream<K, V> inputKStream = streamsBuilder.stream("your_input_topic", Consumed.with(keySerde, valueSerdes));

inputKStream
        .filter((key, value) -> {filter logic for topic 1 here})
        .to("your_1st_output_topic");

inputKStream
        .filter((key, value) -> {filter logic for topic 2 here})
        .to("your_2nd_output_topic");
...

我能够拆分数据并将 KSQL 用于我在下面分享的方法。 1. 使用 value_format='JSON' 和列 payload 作为 STRING 创建输入流 2. 有效负载将包含整个记录作为 STRING 3. 然后使用 WHERE 子句中的 LIKE 运算符将记录拆分为不同的流,同时根据要求将有效负载放入不同的流中。在这里,我使用了 KSQL 的 SPLIT 运算符从有效载荷中获取逗号分隔格式的记录