重新排序 Kiba 作业中的行

Reorder rows in a Kiba job

我有一个 kiba 作业,需要一个 CSV 文件(Kiba::Common::Sources::CSV),丰富它的数据,合并一些行(ChainableAggregateDestination 目的地描述 here)并保存它到另一个 CSV 文件(Kiba::Common::Destinations::CSV)。

现在,我想对目标 CSV 中的行进行不同的排序(基于第一列)。我找不到一种方法来编写执行此操作的转换。我可以使用 post_process 重新打开目标 CSV,对其进行排序并重写,但我想有更简洁的方法...

有人能指出我正确的方向吗?

要对行进行排序,一个好的策略是使用“聚合转换”,如 this article 中所述,将所有行存储在内存中(尽管您可以在内存之外进行),然后在转换“关闭”时间,对它们进行排序并在管道中重新发出它们。

这是IMO最灵活的设计。

class SortingTransform
  def initialize(config...)
    @rows = []
  end

  def process(row)
    @rows << row
    nil # do not emit rows right away
  end

  def close
    # Here: sort the rows, optionally using external
    # configuration passed at init time
    @rows.sort_by { ... }.each do |row|
      yield row
    end
  end
end

您确实也可以在辅助 ETL 作业中重新打开输出并对其进行排序,但如果第一个解决方案适合您,我通常更喜欢它。