如何比较两个版本的 delta table 得到类似于 CDC 的变化?

How to compare two versions of delta table to get changes similar to CDC?

如果我想使用增量时间旅行来比较两个版本以获得类似于CDC的更改,该怎么做?

我可以看到两个选项:

  1. 在 SQL 中您有 EXCEPT/MINUS 查询,您将所有数据与另一个 table 进行比较。我假设你也可以使用它,对吗?但是,如果您比较的版本越来越大并且您总是需要将所有版本与最新版本的所有行进行比较,那么速度是否足够快?

  2. Delta 是在对每行进行某种哈希运算并且速度非常快,还是 Delta 非常耗时?


发现于 slack

您可以计算 table 的两个版本的差异,但如您所料,这样做的成本很高。当 delta table 有变化而不是追加时,计算实际差异也很棘手。

通常当人们问起这个问题时,他们正在尝试设计自己的系统,为他们提供从 delta 到某个地方的数据的恰好一次处理; spark streaming + Delta 源已经存在可以做到这一点

如果你确实想自己编写,你可以直接读取事务日志(协议规范在 https://github.com/delta-io/delta/blob/master/PROTOCOL.md)并使用你正在计算的两个版本之间的操作来找出哪个文件有更改需要读取


请注意,增量 table 的版本已缓存(由 Spark 保留),因此比较不同的数据集应该相当便宜。

val v0 = spark.read.format("delta").option("versionAsOf", 0).load("/tmp/delta/t2")
val v1 = spark.read.format("delta").option("versionAsOf", 1).load("/tmp/delta/t2")
// v0 and v1 are persisted - see Storage tab in web UI

获得那些 v0 和 v1 并不昂贵;比较两者可能既昂贵又棘手。如果 table 是仅附加的,那么它是 (v1 - v0);如果它有更新插入,那么你还必须处理 (v0 - v1),如果它有元数据或协议更改,它会变得更加棘手。

当您自己执行所有这些逻辑时,它与重新实现 DeltaSource 非常相似。


您可以考虑以下几点:

val log = DeltaLog.forTable(spark, "/tmp/delta/t2")
val v0 = log.getSnapshotAt(0)
val actionsAtV0 = v0.state

val v1 = log.getSnapshotAt(1)
val actionsAtV1 = v1.state

actionsAtV0actionsAtV1分别是将deltatable带到版本0和1的所有动作,可以认为是delta[=35=的CDC ].

这基本上就是读取事务日志,除了使用一些 Delta 的内部 API 来简化它。

Databricks 最近添加了 Change Data Feed(以前称为 Delta Change Data Capture),它似乎直接解决了这个用例 -

https://docs.databricks.com/release-notes/runtime/8.2.html#incrementally-ingest-updates-and-deletions-in-delta-tables-using-a-change-data-feed-public-preview

The change data feed of a Delta table represents the row-level changes between different versions of the table. When enabled, the runtime records additional information regarding row-level changes for every write operation on the table. You can query these changes through SQL and DataFrame and DataStream readers. The feed enables:

  • Efficient downstream consumption of merge, updates, and deletes. Getting rows that were updated, inserted, or deleted greatly improves the performance of the downstream job consuming the output of the merge as entire files need not be processed and deduplicated now.
  • Maintaining sync between replicas of two different tables representing the same data. It is a common practice to maintain two versions of the same table one narrow table as the source of truth and a wider table with additional data. Changes can be efficiently applied from the narrow table to the wider table.