使用 Kiba-ETL 将 table 转换为集合的散列

Transforming a table into a hash of sets using Kiba-ETL

我正忙于处理 ETL 管道,但对于这个特定问题,我需要获取 table 数据,并将每一列变成一个集合 - 即一个唯一数组。

我正在努力思考如何在 Kiba 框架内实现这一目标。

这是我要实现的目标的本质:

来源:

[
  { dairy: "Milk",   protein: "Steak",   carb: "Potatoes" },
  { dairy: "Milk",   protein: "Eggs",    carb: "Potatoes" },
  { dairy: "Cheese", protein: "Steak",   carb: "Potatoes" },
  { dairy: "Cream",  protein: "Chicken", carb: "Potatoes" },
  { dairy: "Milk",   protein: "Chicken", carb: "Pasta" },
]

目的地

{
  dairy:   ["Milk", "Cheese", "Cream"],
  protein: ["Steak", "Eggs", "Chicken"],
  carb:    ["Potatoes", "Pasta"],
}

这样的事情 a) 在 Kiba 可行,b) 在 Kiba 甚至可取吗?

如有任何帮助,我们将不胜感激。

更新 - 部分解决。

我找到了部分解决方案。这个转换器 class 会将行的 table 转换为集合的散列,但我一直在研究如何使用 ETL 目标获取数据。我怀疑我使用 Kiba 的方式不适合使用它。

class ColumnSetTransformer
  def initialize
    @col_set = Hash.new(Set.new)
  end

  def process(row)
    row.each do |col, col_val|
      @col_set[col] = @col_set[col] + [col_val]
    end

    @col_set
  end
end 

好的 - 因此,在工作环境中使用 Kiba 似乎并不是该工具的预期用途。之所以想用Kiba,是因为我已经为这个项目实现了很多相关的E、T、L代码,复用量很大。

所以,如果我有代码可以重用,但我不能在 Kiba 框架内使用它,我可以像调用普通代码一样调用它。这都要归功于 Thibaut 极简的设计!

这是我解决问题的方法:

source  = CSVOrXLSXSource.new("data.xlsx", document_config: { some: :settings })
xformer = ColumnSetTransformer.new

source.each do |row|
  xformer.process(row)
end

p xformer.col_set # col_set must be attr_reader on this class.

现在我可以轻松转换我的数据:)

你的解决方案会工作得很好,事实上在 Kiba 中有这样一个设计的原因(主要是 "Plain Old Ruby Objects")是为了让你自己调用组件变得容易,如果你需要的话! (这对测试非常有用!)。

也就是说这里有一些额外的可能性。

你所做的是一种聚合形式,可以通过多种方式实现。

缓冲目的地

这里的缓冲区实际上是一行。使用如下代码:

class MyBufferingDestination
  attr_reader :single_output_row

  def initialize(config:)
    @single_output_row = []
  end

  def write(row)
    row.each do |col, col_val|
      single_output_row[col] += [col_val]
    end
  end

  def close # will be called by Kiba at the end of the run
    # here you'd write your output
  end
end

使用实例变量聚合+post_process块

pre_process do
  @output_row = {}
end

transform do |row|
  row.each do |col, col_val|
    @output_row = # SNIP
  end      
  row
end

post_process do
  # convert @output_row to something
  # you can invoke a destination manually, or do something else
end

很快可能:使用缓冲转换

here所述,很快就可以创建缓冲转换,以更好地将聚合机制与目标本身分离。

它将是这样的:

class MyAggregatingTransform
  def process(row)
    @aggregate += xxx
    nil # remove the row from the pipeline
  end

  def close
    # not yet possible, but soon
    yield @aggregate
  end
end

这将是最好的设计,因为这样您就可以重用现有目的地,而无需修改它们以支持缓冲,因此它们将变得更加通用和可重用:

transform MyAggregatingTransform

destination MyJSONDestination, file: "some.json"

通过检测输入数据集中的边界并相应地产生,甚至可以在目标中包含多行。

一旦可行,我将更新 SO 答案。