使用 Apache Beam 数据流从 CDC 记录的 PCollection 中合并填充最新值的单行

Consolidating a single row filling latest value from PCollection of CDC records using Apache Beam dataflow

更改数据捕获 (CDC) 记录不会包含记录中列的所有值。有可能对于记录的主键,比如 R1,我们可以为 R1 获取带有 CDC 时间戳的 CDC 记录的 PCollection。
前任。如果记录 R1 有 C1、C2、C3、C4 列,CDCTimestamp

CDC 记录将
R1,C1.1,--,C3.1,--,10:02
R1,C1.2,C2.2,--,C4.2,10:03
R1,--,C2.3,--,C4.3,10:04
R2,C2.1,--,C3.1,--,10:03

当Beam流水线处理时我需要得到如下输出 包含的PCollection R1、C1.2、C2.3、C3.1、C4.3、10:04
R2,C2.1,--,C3.1,--,10:03

任何指点将不胜感激!谢谢。

不确定我是否正确理解你的问题。

1)首先将其转化为KV对:

R1: C1.1,--,C3.1,--,10:02
R1: C1.2,C2.2,--,C4.2,10:03
R1: --,C2.3,--,C4.3,10:04
R2: C2.1,--,C3.1,--,10:03

2) 然后做一个GroupBykey:

R1-> C1.1,--,C3.1,--,10:02
     C1.2,C2.2,--,C4.2,10:03
     --,C2.3,--,C4.3,10:04
R2-> C2.1,--,C3.1,--,10:03

3) 然后对于每个key,处理多个元素,将它们转化为一个。

感谢 Ruoyun Huang 为我指明了正确的方向...如果其他人有兴趣解决类似问题,请附上伪代码。

PCollection<TableRow> rowsFromBigQuery =
           pipeline.apply(
                BigQueryIO.readTableRows()
                    .fromQuery(QUERY)
                    .withoutValidation()
                    .usingStandardSql()
                   .withMethod(Method.DIRECT_READ));
                   //.withReadOptions(tableReadOptions));
// Seggregate by keys
PCollection<KV<String, TableRow>> rowByKeys =
            rowsFromBigQuery.apply("Seggregating by row keys", ParDo.of(new CreateKeyValue()));
// Group by keys
PCollection<KV<String, Iterable<TableRow>>> groupByKeys =
            rowByKeys.apply("Grouping by row keys", GroupByKey.<String, TableRow>create());
// Consolidate CDC
PCollection<TableRow> cdcConsolidatedRows =
            groupByKeys.apply("Consolidate CDC by row keys", ParDo.of(new ConsolidateCDC()));

CreateKeyValue 和 ConsolidateCDC 的签名如下:

static class ConsolidateCDC extends DoFn <KV<String, Iterable<TableRow>>, TableRow> and static class CreateKeyValue extends DoFn<TableRow, KV<String, TableRow>>