重新排序 Kiba 作业中的行
Reorder rows in a Kiba job
我有一个 kiba 作业,需要一个 CSV 文件(Kiba::Common::Sources::CSV
),丰富它的数据,合并一些行(ChainableAggregateDestination
目的地描述 here)并保存它到另一个 CSV 文件(Kiba::Common::Destinations::CSV
)。
现在,我想对目标 CSV 中的行进行不同的排序(基于第一列)。我找不到一种方法来编写执行此操作的转换。我可以使用 post_process
重新打开目标 CSV,对其进行排序并重写,但我想有更简洁的方法...
有人能指出我正确的方向吗?
要对行进行排序,一个好的策略是使用“聚合转换”,如 this article 中所述,将所有行存储在内存中(尽管您可以在内存之外进行),然后在转换“关闭”时间,对它们进行排序并在管道中重新发出它们。
这是IMO最灵活的设计。
class SortingTransform
def initialize(config...)
@rows = []
end
def process(row)
@rows << row
nil # do not emit rows right away
end
def close
# Here: sort the rows, optionally using external
# configuration passed at init time
@rows.sort_by { ... }.each do |row|
yield row
end
end
end
您确实也可以在辅助 ETL 作业中重新打开输出并对其进行排序,但如果第一个解决方案适合您,我通常更喜欢它。
我有一个 kiba 作业,需要一个 CSV 文件(Kiba::Common::Sources::CSV
),丰富它的数据,合并一些行(ChainableAggregateDestination
目的地描述 here)并保存它到另一个 CSV 文件(Kiba::Common::Destinations::CSV
)。
现在,我想对目标 CSV 中的行进行不同的排序(基于第一列)。我找不到一种方法来编写执行此操作的转换。我可以使用 post_process
重新打开目标 CSV,对其进行排序并重写,但我想有更简洁的方法...
有人能指出我正确的方向吗?
要对行进行排序,一个好的策略是使用“聚合转换”,如 this article 中所述,将所有行存储在内存中(尽管您可以在内存之外进行),然后在转换“关闭”时间,对它们进行排序并在管道中重新发出它们。
这是IMO最灵活的设计。
class SortingTransform
def initialize(config...)
@rows = []
end
def process(row)
@rows << row
nil # do not emit rows right away
end
def close
# Here: sort the rows, optionally using external
# configuration passed at init time
@rows.sort_by { ... }.each do |row|
yield row
end
end
end
您确实也可以在辅助 ETL 作业中重新打开输出并对其进行排序,但如果第一个解决方案适合您,我通常更喜欢它。