使用 Kiba-ETL 将 table 转换为集合的散列
Transforming a table into a hash of sets using Kiba-ETL
我正忙于处理 ETL 管道,但对于这个特定问题,我需要获取 table 数据,并将每一列变成一个集合 - 即一个唯一数组。
我正在努力思考如何在 Kiba 框架内实现这一目标。
这是我要实现的目标的本质:
来源:
[
{ dairy: "Milk", protein: "Steak", carb: "Potatoes" },
{ dairy: "Milk", protein: "Eggs", carb: "Potatoes" },
{ dairy: "Cheese", protein: "Steak", carb: "Potatoes" },
{ dairy: "Cream", protein: "Chicken", carb: "Potatoes" },
{ dairy: "Milk", protein: "Chicken", carb: "Pasta" },
]
目的地
{
dairy: ["Milk", "Cheese", "Cream"],
protein: ["Steak", "Eggs", "Chicken"],
carb: ["Potatoes", "Pasta"],
}
这样的事情 a) 在 Kiba 可行,b) 在 Kiba 甚至可取吗?
如有任何帮助,我们将不胜感激。
更新 - 部分解决。
我找到了部分解决方案。这个转换器 class 会将行的 table 转换为集合的散列,但我一直在研究如何使用 ETL 目标获取数据。我怀疑我使用 Kiba 的方式不适合使用它。
class ColumnSetTransformer
def initialize
@col_set = Hash.new(Set.new)
end
def process(row)
row.each do |col, col_val|
@col_set[col] = @col_set[col] + [col_val]
end
@col_set
end
end
好的 - 因此,在工作环境中使用 Kiba 似乎并不是该工具的预期用途。之所以想用Kiba,是因为我已经为这个项目实现了很多相关的E、T、L代码,复用量很大。
所以,如果我有代码可以重用,但我不能在 Kiba 框架内使用它,我可以像调用普通代码一样调用它。这都要归功于 Thibaut 极简的设计!
这是我解决问题的方法:
source = CSVOrXLSXSource.new("data.xlsx", document_config: { some: :settings })
xformer = ColumnSetTransformer.new
source.each do |row|
xformer.process(row)
end
p xformer.col_set # col_set must be attr_reader on this class.
现在我可以轻松转换我的数据:)
你的解决方案会工作得很好,事实上在 Kiba 中有这样一个设计的原因(主要是 "Plain Old Ruby Objects")是为了让你自己调用组件变得容易,如果你需要的话! (这对测试非常有用!)。
也就是说这里有一些额外的可能性。
你所做的是一种聚合形式,可以通过多种方式实现。
缓冲目的地
这里的缓冲区实际上是一行。使用如下代码:
class MyBufferingDestination
attr_reader :single_output_row
def initialize(config:)
@single_output_row = []
end
def write(row)
row.each do |col, col_val|
single_output_row[col] += [col_val]
end
end
def close # will be called by Kiba at the end of the run
# here you'd write your output
end
end
使用实例变量聚合+post_process块
pre_process do
@output_row = {}
end
transform do |row|
row.each do |col, col_val|
@output_row = # SNIP
end
row
end
post_process do
# convert @output_row to something
# you can invoke a destination manually, or do something else
end
很快可能:使用缓冲转换
如here所述,很快就可以创建缓冲转换,以更好地将聚合机制与目标本身分离。
它将是这样的:
class MyAggregatingTransform
def process(row)
@aggregate += xxx
nil # remove the row from the pipeline
end
def close
# not yet possible, but soon
yield @aggregate
end
end
这将是最好的设计,因为这样您就可以重用现有目的地,而无需修改它们以支持缓冲,因此它们将变得更加通用和可重用:
transform MyAggregatingTransform
destination MyJSONDestination, file: "some.json"
通过检测输入数据集中的边界并相应地产生,甚至可以在目标中包含多行。
一旦可行,我将更新 SO 答案。
我正忙于处理 ETL 管道,但对于这个特定问题,我需要获取 table 数据,并将每一列变成一个集合 - 即一个唯一数组。
我正在努力思考如何在 Kiba 框架内实现这一目标。
这是我要实现的目标的本质:
来源:
[
{ dairy: "Milk", protein: "Steak", carb: "Potatoes" },
{ dairy: "Milk", protein: "Eggs", carb: "Potatoes" },
{ dairy: "Cheese", protein: "Steak", carb: "Potatoes" },
{ dairy: "Cream", protein: "Chicken", carb: "Potatoes" },
{ dairy: "Milk", protein: "Chicken", carb: "Pasta" },
]
目的地
{
dairy: ["Milk", "Cheese", "Cream"],
protein: ["Steak", "Eggs", "Chicken"],
carb: ["Potatoes", "Pasta"],
}
这样的事情 a) 在 Kiba 可行,b) 在 Kiba 甚至可取吗?
如有任何帮助,我们将不胜感激。
更新 - 部分解决。
我找到了部分解决方案。这个转换器 class 会将行的 table 转换为集合的散列,但我一直在研究如何使用 ETL 目标获取数据。我怀疑我使用 Kiba 的方式不适合使用它。
class ColumnSetTransformer
def initialize
@col_set = Hash.new(Set.new)
end
def process(row)
row.each do |col, col_val|
@col_set[col] = @col_set[col] + [col_val]
end
@col_set
end
end
好的 - 因此,在工作环境中使用 Kiba 似乎并不是该工具的预期用途。之所以想用Kiba,是因为我已经为这个项目实现了很多相关的E、T、L代码,复用量很大。
所以,如果我有代码可以重用,但我不能在 Kiba 框架内使用它,我可以像调用普通代码一样调用它。这都要归功于 Thibaut 极简的设计!
这是我解决问题的方法:
source = CSVOrXLSXSource.new("data.xlsx", document_config: { some: :settings })
xformer = ColumnSetTransformer.new
source.each do |row|
xformer.process(row)
end
p xformer.col_set # col_set must be attr_reader on this class.
现在我可以轻松转换我的数据:)
你的解决方案会工作得很好,事实上在 Kiba 中有这样一个设计的原因(主要是 "Plain Old Ruby Objects")是为了让你自己调用组件变得容易,如果你需要的话! (这对测试非常有用!)。
也就是说这里有一些额外的可能性。
你所做的是一种聚合形式,可以通过多种方式实现。
缓冲目的地
这里的缓冲区实际上是一行。使用如下代码:
class MyBufferingDestination
attr_reader :single_output_row
def initialize(config:)
@single_output_row = []
end
def write(row)
row.each do |col, col_val|
single_output_row[col] += [col_val]
end
end
def close # will be called by Kiba at the end of the run
# here you'd write your output
end
end
使用实例变量聚合+post_process块
pre_process do
@output_row = {}
end
transform do |row|
row.each do |col, col_val|
@output_row = # SNIP
end
row
end
post_process do
# convert @output_row to something
# you can invoke a destination manually, or do something else
end
很快可能:使用缓冲转换
如here所述,很快就可以创建缓冲转换,以更好地将聚合机制与目标本身分离。
它将是这样的:
class MyAggregatingTransform
def process(row)
@aggregate += xxx
nil # remove the row from the pipeline
end
def close
# not yet possible, but soon
yield @aggregate
end
end
这将是最好的设计,因为这样您就可以重用现有目的地,而无需修改它们以支持缓冲,因此它们将变得更加通用和可重用:
transform MyAggregatingTransform
destination MyJSONDestination, file: "some.json"
通过检测输入数据集中的边界并相应地产生,甚至可以在目标中包含多行。
一旦可行,我将更新 SO 答案。