使用 kiba ETL 检查 CSV 文件的 headers 的最佳位置
Best place to check headers for a CSV file in with kiba ETL
我需要检查:
- header 行存在
- header 包含一组特定的 headers
最好的地方是什么。我有一些可能的解决方案,但不知道更惯用的解决方案
- 在 运行 之前检查完整的 ETL,例如在
Kiba.parse
块之前
- 签入 ETL
中的 pre_process
块
- 检查 ETL 源。我倾向于更喜欢这个,因为它会更可重用(需要将必填字段作为参数传递)
请注意,即使我可以在 transform
块中签入 row
上可用的字段,此解决方案似乎也不是很有效,因为它会 运行 每行。
感谢任何提示
有多种非常惯用的方法可以实现这一点:
在源代码级别(传递 headers 的数组)
您可以在不使用 headers: true
的情况下使用 CSV
,这提供了仔细检查 headers:
的机会
class CSVSource
def initialize(filename:, csv_options:, expected_headers:)
# SNIP
def each
CSV.foreach(filename, csv_options).with_index do |row, file_row_index|
if file_row_index == 0
check_headers!(actual: row.to_a, expected: expected_headers)
next # do not propagate the headers row
else
yield(Hash[expected_headers.zip(row.to_a)])
end
end
end
def check_headers!(actual:, expected:)
# SNIP - verify uniqueness, presence, raise a clear message if needed
end
在源代码级别(让调用者使用 lambda 定义行为)
class CSVSource
def initialize(after_headers_read_callback:, ...)
@after_headers_read_callback = ...
def each
CSV.foreach(filename, csv_options).with_index do |row, file_row_index|
if file_row_index == 0
@after_headers_read_callback.call(row.to_a)
next
end
# ...
end
end
lambda 将让调用者定义他们自己的检查,如果需要则引发等,这更利于重用。
在变换级别
如果您想进一步分离组件(例如,将 headers 处理与行来自 CSV 源的事实分开),您可以使用转换。
我经常使用这种设计,它可以更好地重用(这里使用 CSV 源,会产生一些 meta-data):
def transform_array_rows_to_hash_rows(after_headers_read_callback:)
transform do |row|
if row.fetch(:file_row_index) == 0
@headers = row.fetch(:row)
after_headers_read_callback.call(@headers)
nil
else
Hash[@headers.zip(row.fetch(:row))].merge(
filename: row.fetch(:filename),
file_row_index: row.fetch(:file_row_index)
)
end
end
end
不推荐的内容
在所有情况下,避免在 Kiba.parse
本身进行任何处理。这是一个更好的设计,以确保 IO 仅在您调用 Kiba.run
时发生(因为它会更多 future-proof 并且将支持后续版本的 Kiba 中的内省功能)。
此外,不推荐使用 pre_process
(虽然它会起作用),因为它会导致一些重复等。
希望这对您有所帮助,如果不清楚请告诉我!
我需要检查:
- header 行存在
- header 包含一组特定的 headers
最好的地方是什么。我有一些可能的解决方案,但不知道更惯用的解决方案
- 在 运行 之前检查完整的 ETL,例如在
Kiba.parse
块之前 - 签入 ETL 中的
- 检查 ETL 源。我倾向于更喜欢这个,因为它会更可重用(需要将必填字段作为参数传递)
pre_process
块
请注意,即使我可以在 transform
块中签入 row
上可用的字段,此解决方案似乎也不是很有效,因为它会 运行 每行。
感谢任何提示
有多种非常惯用的方法可以实现这一点:
在源代码级别(传递 headers 的数组)
您可以在不使用 headers: true
的情况下使用 CSV
,这提供了仔细检查 headers:
class CSVSource
def initialize(filename:, csv_options:, expected_headers:)
# SNIP
def each
CSV.foreach(filename, csv_options).with_index do |row, file_row_index|
if file_row_index == 0
check_headers!(actual: row.to_a, expected: expected_headers)
next # do not propagate the headers row
else
yield(Hash[expected_headers.zip(row.to_a)])
end
end
end
def check_headers!(actual:, expected:)
# SNIP - verify uniqueness, presence, raise a clear message if needed
end
在源代码级别(让调用者使用 lambda 定义行为)
class CSVSource
def initialize(after_headers_read_callback:, ...)
@after_headers_read_callback = ...
def each
CSV.foreach(filename, csv_options).with_index do |row, file_row_index|
if file_row_index == 0
@after_headers_read_callback.call(row.to_a)
next
end
# ...
end
end
lambda 将让调用者定义他们自己的检查,如果需要则引发等,这更利于重用。
在变换级别
如果您想进一步分离组件(例如,将 headers 处理与行来自 CSV 源的事实分开),您可以使用转换。
我经常使用这种设计,它可以更好地重用(这里使用 CSV 源,会产生一些 meta-data):
def transform_array_rows_to_hash_rows(after_headers_read_callback:)
transform do |row|
if row.fetch(:file_row_index) == 0
@headers = row.fetch(:row)
after_headers_read_callback.call(@headers)
nil
else
Hash[@headers.zip(row.fetch(:row))].merge(
filename: row.fetch(:filename),
file_row_index: row.fetch(:file_row_index)
)
end
end
end
不推荐的内容
在所有情况下,避免在 Kiba.parse
本身进行任何处理。这是一个更好的设计,以确保 IO 仅在您调用 Kiba.run
时发生(因为它会更多 future-proof 并且将支持后续版本的 Kiba 中的内省功能)。
此外,不推荐使用 pre_process
(虽然它会起作用),因为它会导致一些重复等。
希望这对您有所帮助,如果不清楚请告诉我!