使用 Kiba 时是否有明显的减少行数的方法?

Is there an obvious way to reduce rows when using Kiba?

首先 - Thibaut,感谢 Kiba。它与 'enterprise' 级 ETL 工具相得益彰,从未让我失望。

我正忙于构建一个 ETL 管道,该管道需要大量行,并将它们缩减为一个摘要行。我觉得这应该是一件简单的事情,但我对如何解决这个问题有点难过。

我们有许多来自语音交换机的 CDR,需要根据一些简单的标准将它们压缩成少量摘要记录。所以,问题是;我有数千条来自 Source 的记录,需要根据一些减少标准将它们转换为少数记录。

当存在一对一 Source -> Destination ETL,甚至是一对多 Source -> [=12= 时,Kiba 非常简单] 使用 V3 中新的可枚举爆炸器,但我没有看到通向多对一 ETL 管道的清晰路径。

如有任何建议或指导,我们将不胜感激。

很高兴您发现 Kiba 很有用!这个用例有多种解决方案。

我在这里做了一些假设(如果这些假设不正确,解决方案将存在,但会有所不同,例如边界检测和外部存储):

  • 您正在处理有限的批次(而不是连续的更新流)。
  • 你说的那一小撮总结记录可以记在心里。

我的建议是利用 Kiba v3 的能力在转换的 close 方法中产生记录(在 this article 中有更深入的描述):

class InMemoryReduceTransform
  attr_reader :buffer, :summarize_cb

  def initialize(summarize_cb:)
    @buffer = []
    @summarize_cb = summarize_cb
  end

  def process(row)
    buffer << row
    nil # do not forward the row to the rest of the pipeline
  end

  def close
    summarize_cb(buffer).each do |row|
      yield row
    end
  end
end

本质上,您只是堆叠输入行,直到源中没有数据,此时将调用 close 方法,然后汇总您拥有的数据并 yield N 个汇总行。

注意:这是一个让您走上正确轨道的简单实施。 Kiba Pro 的下一个迭代将包括一个更具扩展性和通用性的版本(有供应商支持)。有兴趣的请联系!

让我知道这是否正确回答了您的问题!