如何将记录拆分为不同的流,从一个主题到不同的流?
How to split records into different streams, from one topic to different streams?
我有一个包含不同大小记录的单一源 CSV 文件,将每条记录推送到一个源主题中。我想将记录拆分为来自该源主题的不同 KStreams/KTables。我有一个 table 负载的管道,我将源主题中的记录以分隔格式推送到 stream1,然后将记录推送到另一个 AVRO 格式的流,然后将其推送到 JDBC 接收器将记录推送到 MySQL 数据库的连接器。管道需要相同。但是我想将不同 table 的记录推送到一个源主题中,然后根据一个值将记录拆分到不同的流中。这可能吗?我尝试寻找方法来做到这一点,但没有成功。我也可以以某种方式改进管道或使用 KTable 代替 KStreams 或任何其他修改吗?
我目前的流量-
一个源 CSV 文件 (source.csv
) -> 源主题(名称 - 包含 test1 记录的源主题) -> 流 1(定界值格式) -> 流 2(作为 AVRO 值格式) -> 结束主题(名称 - sink-db-test1
) -> JDBC 接收器连接器 -> MySQL DB (name - test1
)
我有一个不同的 MySQL table test2
,它的架构不同,table 的记录也存在于 source.csv
文件中。由于架构不同,我无法按照 test1
的当前管道将数据插入 test2
table.
示例 -
在 CSV 源文件中,
line 1 - 9,atm,mun,ronaldo
line 2- 10,atm,mun,bravo,num2
line 3 - 11,atm,sign,bravo,sick
在此示例中,要拆分的值是 column 4
(ronaldo
或 bravo
)
所有这些数据应分别加载到 table 1
、table 2
、table 3
关键是第 4 列。
if col4==ronaldo, go to table 1
if col4==bravo and col3==mun, go to table 2
if col4==bravo and col3 ==sign go to table 3
我是 Kafka 的新手,从上周开始进行 Kafka 开发。
您可以编写一个单独的 Kafka Streams 应用程序,使用 KStream#branch()
operator:
将记录从输入主题拆分到不同的 KStream 或输出主题
KStream<K, V>[] branches = streamsBuilder.branch(
(key, value) -> {filter logic for topic 1 here},
(key, value) -> {filter logic for topic 2 here},
(key, value) -> true//get all messages for this branch
);
// KStream branches[0] records for logic 1
// KStream branches[1] records for logic 2
// KStream branches[2] records for logic 3
或者您可以像这样手动分支您的 KStream:
KStream<K, V> inputKStream = streamsBuilder.stream("your_input_topic", Consumed.with(keySerde, valueSerdes));
inputKStream
.filter((key, value) -> {filter logic for topic 1 here})
.to("your_1st_output_topic");
inputKStream
.filter((key, value) -> {filter logic for topic 2 here})
.to("your_2nd_output_topic");
...
我能够拆分数据并将 KSQL 用于我在下面分享的方法。
1. 使用 value_format='JSON'
和列 payload
作为 STRING
创建输入流
2. 有效负载将包含整个记录作为 STRING
3. 然后使用 WHERE
子句中的 LIKE
运算符将记录拆分为不同的流,同时根据要求将有效负载放入不同的流中。在这里,我使用了 KSQL 的 SPLIT
运算符从有效载荷中获取逗号分隔格式的记录
我有一个包含不同大小记录的单一源 CSV 文件,将每条记录推送到一个源主题中。我想将记录拆分为来自该源主题的不同 KStreams/KTables。我有一个 table 负载的管道,我将源主题中的记录以分隔格式推送到 stream1,然后将记录推送到另一个 AVRO 格式的流,然后将其推送到 JDBC 接收器将记录推送到 MySQL 数据库的连接器。管道需要相同。但是我想将不同 table 的记录推送到一个源主题中,然后根据一个值将记录拆分到不同的流中。这可能吗?我尝试寻找方法来做到这一点,但没有成功。我也可以以某种方式改进管道或使用 KTable 代替 KStreams 或任何其他修改吗?
我目前的流量-
一个源 CSV 文件 (source.csv
) -> 源主题(名称 - 包含 test1 记录的源主题) -> 流 1(定界值格式) -> 流 2(作为 AVRO 值格式) -> 结束主题(名称 - sink-db-test1
) -> JDBC 接收器连接器 -> MySQL DB (name - test1
)
我有一个不同的 MySQL table test2
,它的架构不同,table 的记录也存在于 source.csv
文件中。由于架构不同,我无法按照 test1
的当前管道将数据插入 test2
table.
示例 - 在 CSV 源文件中,
line 1 - 9,atm,mun,ronaldo
line 2- 10,atm,mun,bravo,num2
line 3 - 11,atm,sign,bravo,sick
在此示例中,要拆分的值是 column 4
(ronaldo
或 bravo
)
所有这些数据应分别加载到 table 1
、table 2
、table 3
关键是第 4 列。
if col4==ronaldo, go to table 1
if col4==bravo and col3==mun, go to table 2
if col4==bravo and col3 ==sign go to table 3
我是 Kafka 的新手,从上周开始进行 Kafka 开发。
您可以编写一个单独的 Kafka Streams 应用程序,使用 KStream#branch()
operator:
KStream<K, V>[] branches = streamsBuilder.branch(
(key, value) -> {filter logic for topic 1 here},
(key, value) -> {filter logic for topic 2 here},
(key, value) -> true//get all messages for this branch
);
// KStream branches[0] records for logic 1
// KStream branches[1] records for logic 2
// KStream branches[2] records for logic 3
或者您可以像这样手动分支您的 KStream:
KStream<K, V> inputKStream = streamsBuilder.stream("your_input_topic", Consumed.with(keySerde, valueSerdes));
inputKStream
.filter((key, value) -> {filter logic for topic 1 here})
.to("your_1st_output_topic");
inputKStream
.filter((key, value) -> {filter logic for topic 2 here})
.to("your_2nd_output_topic");
...
我能够拆分数据并将 KSQL 用于我在下面分享的方法。
1. 使用 value_format='JSON'
和列 payload
作为 STRING
创建输入流
2. 有效负载将包含整个记录作为 STRING
3. 然后使用 WHERE
子句中的 LIKE
运算符将记录拆分为不同的流,同时根据要求将有效负载放入不同的流中。在这里,我使用了 KSQL 的 SPLIT
运算符从有效载荷中获取逗号分隔格式的记录