如何在提取器中过滤数据?

How to filter data in extractor?

我有一个很长的 运行ning 管道,其中有一些失败的项目(在流程结束时未加载的项目,因为它们未通过数据库验证或类似的事情)。

我想重新运行 管道,但只处理最后 运行 导入失败的项目。

我有一个系统可以检查每个项目 ID(我从外部来源收到的)。我在我的装载机中做这个检查。如果数据库中已有该项目 ID,我将跳过 loading/inserting 数据库中的该项目。

效果很好。但是,它很慢,因为我对这些项目中的每一个都进行了提取-转换-加载,然后才在加载时查询数据库(每个项目一个查询)并比较项目 ID。

我想尽快过滤掉这些记录。如果我在 transformer 中做,我只能对每个项目再做一次。看起来提取器可能是合适的地方,或者我可以将记录分批传递给转换器,然后过滤+分解(第一个)转换器中的项目。

这里有什么更好的方法?

我也在考虑提取器的可重用性,但我想我可以接受一个提取器既提取又过滤的事实。我认为最好的解决方案是能够链接多个提取器。然后我会有一个提取数据的和另一个过滤数据的。

编辑:也许我可以这样做:

already_imported_item_ids = Items.pluck(:item_id)

Kiba.run(
  Kiba.parse do
    source(...)

    transform do |item|
      next if already_imported_item_ids.include?(item)

      item
    end

    transform(...)
    destination(...)
  end
)

我想这行得通吗?

一些提示:

  1. 管道中越高(越快)越好。如果你能找到一种方法,从源头上过滤掉,成本会更低,因为你根本不需要对数据进行操作。

  2. 如果比例足够小,则可以在 pre_process 块的开头仅加载完整的 ID 列表(主要是您在代码示例中想到的内容) ,然后在源之后进行比较。显然它不会无限扩展,但它可以工作很长时间,具体取决于你的数据集大小。

  3. 如果您需要更高的规模,我建议您使用缓冲转换(将 N 行分组)来实现单个 SQL 查询以验证是否存在目标数据库中的所有 N 行 ID,或者使用行组然后确实爆炸。