如何在提取器中过滤数据?
How to filter data in extractor?
我有一个很长的 运行ning 管道,其中有一些失败的项目(在流程结束时未加载的项目,因为它们未通过数据库验证或类似的事情)。
我想重新运行 管道,但只处理最后 运行 导入失败的项目。
我有一个系统可以检查每个项目 ID(我从外部来源收到的)。我在我的装载机中做这个检查。如果数据库中已有该项目 ID,我将跳过 loading/inserting 数据库中的该项目。
效果很好。但是,它很慢,因为我对这些项目中的每一个都进行了提取-转换-加载,然后才在加载时查询数据库(每个项目一个查询)并比较项目 ID。
我想尽快过滤掉这些记录。如果我在 transformer 中做,我只能对每个项目再做一次。看起来提取器可能是合适的地方,或者我可以将记录分批传递给转换器,然后过滤+分解(第一个)转换器中的项目。
这里有什么更好的方法?
我也在考虑提取器的可重用性,但我想我可以接受一个提取器既提取又过滤的事实。我认为最好的解决方案是能够链接多个提取器。然后我会有一个提取数据的和另一个过滤数据的。
编辑:也许我可以这样做:
already_imported_item_ids = Items.pluck(:item_id)
Kiba.run(
Kiba.parse do
source(...)
transform do |item|
next if already_imported_item_ids.include?(item)
item
end
transform(...)
destination(...)
end
)
我想这行得通吗?
一些提示:
管道中越高(越快)越好。如果你能找到一种方法,从源头上过滤掉,成本会更低,因为你根本不需要对数据进行操作。
如果比例足够小,则可以在 pre_process
块的开头仅加载完整的 ID 列表(主要是您在代码示例中想到的内容) ,然后在源之后进行比较。显然它不会无限扩展,但它可以工作很长时间,具体取决于你的数据集大小。
如果您需要更高的规模,我建议您使用缓冲转换(将 N 行分组)来实现单个 SQL 查询以验证是否存在目标数据库中的所有 N 行 ID,或者使用行组然后确实爆炸。
我有一个很长的 运行ning 管道,其中有一些失败的项目(在流程结束时未加载的项目,因为它们未通过数据库验证或类似的事情)。
我想重新运行 管道,但只处理最后 运行 导入失败的项目。
我有一个系统可以检查每个项目 ID(我从外部来源收到的)。我在我的装载机中做这个检查。如果数据库中已有该项目 ID,我将跳过 loading/inserting 数据库中的该项目。
效果很好。但是,它很慢,因为我对这些项目中的每一个都进行了提取-转换-加载,然后才在加载时查询数据库(每个项目一个查询)并比较项目 ID。
我想尽快过滤掉这些记录。如果我在 transformer 中做,我只能对每个项目再做一次。看起来提取器可能是合适的地方,或者我可以将记录分批传递给转换器,然后过滤+分解(第一个)转换器中的项目。
这里有什么更好的方法?
我也在考虑提取器的可重用性,但我想我可以接受一个提取器既提取又过滤的事实。我认为最好的解决方案是能够链接多个提取器。然后我会有一个提取数据的和另一个过滤数据的。
编辑:也许我可以这样做:
already_imported_item_ids = Items.pluck(:item_id)
Kiba.run(
Kiba.parse do
source(...)
transform do |item|
next if already_imported_item_ids.include?(item)
item
end
transform(...)
destination(...)
end
)
我想这行得通吗?
一些提示:
管道中越高(越快)越好。如果你能找到一种方法,从源头上过滤掉,成本会更低,因为你根本不需要对数据进行操作。
如果比例足够小,则可以在
pre_process
块的开头仅加载完整的 ID 列表(主要是您在代码示例中想到的内容) ,然后在源之后进行比较。显然它不会无限扩展,但它可以工作很长时间,具体取决于你的数据集大小。如果您需要更高的规模,我建议您使用缓冲转换(将 N 行分组)来实现单个 SQL 查询以验证是否存在目标数据库中的所有 N 行 ID,或者使用行组然后确实爆炸。