使用 Apache Beam 数据流从 CDC 记录的 PCollection 中合并填充最新值的单行
Consolidating a single row filling latest value from PCollection of CDC records using Apache Beam dataflow
更改数据捕获 (CDC) 记录不会包含记录中列的所有值。有可能对于记录的主键,比如 R1,我们可以为 R1 获取带有 CDC 时间戳的 CDC 记录的 PCollection。
前任。如果记录 R1 有 C1、C2、C3、C4 列,CDCTimestamp
CDC 记录将
R1,C1.1,--,C3.1,--,10:02
R1,C1.2,C2.2,--,C4.2,10:03
R1,--,C2.3,--,C4.3,10:04
R2,C2.1,--,C3.1,--,10:03
当Beam流水线处理时我需要得到如下输出
包含的PCollection
R1、C1.2、C2.3、C3.1、C4.3、10:04
R2,C2.1,--,C3.1,--,10:03
任何指点将不胜感激!谢谢。
不确定我是否正确理解你的问题。
那
呢
1)首先将其转化为KV对:
R1: C1.1,--,C3.1,--,10:02
R1: C1.2,C2.2,--,C4.2,10:03
R1: --,C2.3,--,C4.3,10:04
R2: C2.1,--,C3.1,--,10:03
2) 然后做一个GroupBykey:
R1-> C1.1,--,C3.1,--,10:02
C1.2,C2.2,--,C4.2,10:03
--,C2.3,--,C4.3,10:04
R2-> C2.1,--,C3.1,--,10:03
3) 然后对于每个key,处理多个元素,将它们转化为一个。
感谢 Ruoyun Huang 为我指明了正确的方向...如果其他人有兴趣解决类似问题,请附上伪代码。
PCollection<TableRow> rowsFromBigQuery =
pipeline.apply(
BigQueryIO.readTableRows()
.fromQuery(QUERY)
.withoutValidation()
.usingStandardSql()
.withMethod(Method.DIRECT_READ));
//.withReadOptions(tableReadOptions));
// Seggregate by keys
PCollection<KV<String, TableRow>> rowByKeys =
rowsFromBigQuery.apply("Seggregating by row keys", ParDo.of(new CreateKeyValue()));
// Group by keys
PCollection<KV<String, Iterable<TableRow>>> groupByKeys =
rowByKeys.apply("Grouping by row keys", GroupByKey.<String, TableRow>create());
// Consolidate CDC
PCollection<TableRow> cdcConsolidatedRows =
groupByKeys.apply("Consolidate CDC by row keys", ParDo.of(new ConsolidateCDC()));
CreateKeyValue 和 ConsolidateCDC 的签名如下:
static class ConsolidateCDC extends DoFn <KV<String, Iterable<TableRow>>, TableRow> and static class CreateKeyValue extends DoFn<TableRow, KV<String, TableRow>>
更改数据捕获 (CDC) 记录不会包含记录中列的所有值。有可能对于记录的主键,比如 R1,我们可以为 R1 获取带有 CDC 时间戳的 CDC 记录的 PCollection。
前任。如果记录 R1 有 C1、C2、C3、C4 列,CDCTimestamp
CDC 记录将
R1,C1.1,--,C3.1,--,10:02
R1,C1.2,C2.2,--,C4.2,10:03
R1,--,C2.3,--,C4.3,10:04
R2,C2.1,--,C3.1,--,10:03
当Beam流水线处理时我需要得到如下输出
包含的PCollection
R1、C1.2、C2.3、C3.1、C4.3、10:04
R2,C2.1,--,C3.1,--,10:03
任何指点将不胜感激!谢谢。
不确定我是否正确理解你的问题。
那
呢1)首先将其转化为KV对:
R1: C1.1,--,C3.1,--,10:02
R1: C1.2,C2.2,--,C4.2,10:03
R1: --,C2.3,--,C4.3,10:04
R2: C2.1,--,C3.1,--,10:03
2) 然后做一个GroupBykey:
R1-> C1.1,--,C3.1,--,10:02
C1.2,C2.2,--,C4.2,10:03
--,C2.3,--,C4.3,10:04
R2-> C2.1,--,C3.1,--,10:03
3) 然后对于每个key,处理多个元素,将它们转化为一个。
感谢 Ruoyun Huang 为我指明了正确的方向...如果其他人有兴趣解决类似问题,请附上伪代码。
PCollection<TableRow> rowsFromBigQuery =
pipeline.apply(
BigQueryIO.readTableRows()
.fromQuery(QUERY)
.withoutValidation()
.usingStandardSql()
.withMethod(Method.DIRECT_READ));
//.withReadOptions(tableReadOptions));
// Seggregate by keys
PCollection<KV<String, TableRow>> rowByKeys =
rowsFromBigQuery.apply("Seggregating by row keys", ParDo.of(new CreateKeyValue()));
// Group by keys
PCollection<KV<String, Iterable<TableRow>>> groupByKeys =
rowByKeys.apply("Grouping by row keys", GroupByKey.<String, TableRow>create());
// Consolidate CDC
PCollection<TableRow> cdcConsolidatedRows =
groupByKeys.apply("Consolidate CDC by row keys", ParDo.of(new ConsolidateCDC()));
CreateKeyValue 和 ConsolidateCDC 的签名如下:
static class ConsolidateCDC extends DoFn <KV<String, Iterable<TableRow>>, TableRow> and static class CreateKeyValue extends DoFn<TableRow, KV<String, TableRow>>