ETL 到 csv 文件,拆分然后推送到 s3 以供 redshift 使用
ETL to csv files, split up and then pushed to s3 to be consume by redshift
Just getting started with Kiba, didn't find anything obvious, but I could be just channeling my inner child (who looks for their shoes by staring at the ceiling).
我想将一个非常大的 table 转储到 Amazon Redshift。似乎最快的方法是将一堆 CSV 文件写入 S3 存储桶,然后告诉 Redshift(通过 COPY
命令)将它们拉入。神奇的缩放 gremlins 将完成剩下的工作。
所以,我想我希望 Kiba 为每 10k 行数据写入一个 CSV 文件,然后将其推送到 s3,然后开始写入一个新文件。最后,对 COPY
进行 post 处理调用
那么,我可以 "pipeline" 工作还是应该是一个大的嵌套 Destination class?
即
source -> transform -> transform ... -> [ csv -> s3 ]{every 10000}; post-process
我不确定这里的确切问题。但是,我认为您的解决方案总体上似乎是正确的,但建议很少。
- 您可能也不想考虑每个 CSV 文件有超过 10K 条记录,并且在发送到 S3 时
gzip
它们。
- 您想查看
menifest
创建包含多个文件的列表,然后 运行 copy
命令提供 menifest
文件作为输入。
Kiba 作者在这里。感谢您试用!
目前,实现这一点的最佳方法是创建我称之为 "buffering destination" 的东西。 (某个版本可能会在某个时候出现在 Kiba Common 中)。
(请彻底测试,我今天早上刚刚为你创作了这个,根本没有运行,尽管我过去使用的通用版本较少。也请保持请记住,此版本为您的 10k 行使用内存缓冲区,因此将数字增加到更大的数量会消耗内存。不过,也可以创建内存消耗最少的版本,它会在您获取行时将行写入文件)
class BufferingDestination
def initialize(buffer_size:, on_flush:)
@buffer = []
@buffer_size
@on_flush = on_flush
@batch_index = 0
end
def write(row)
@buffer << row
flush if @buffer.size >= buffer_size
end
def flush
on_flush.call(batch_index: @batch_index, rows: @buffer)
@batch_index += 1
@buffer.clear
end
def close
flush
end
end
这是你可以像这样使用的东西,例如这里重复使用 Kiba Common CSV destination(尽管你也可以编写自己的):
require 'kiba-common/destinations/csv'
destination BufferingDestination,
buffer_size: 10_000,
on_flush: -> { |batch_index, rows|
filename = File.join("output-#{sprintf("%08d", batch_index)}")
csv = Kiba::Common::Destinations::CSV.new(
filename: filename,
csv_options: { ... },
headers: %w(my fields here)
)
rows.each { |r| csv.write(r) }
csv.close
}
您可以在生成文件后在 on_flush
块中触发您的 COPY
(如果您希望立即开始上传),或者在 post_process
块中(但这只会在所有 CSV 准备就绪后才开始,如果您愿意,这可以作为确保某种形式的事务性全局上传的功能)。
如果你真的需要这个,你可以花点时间启动一个线程队列来实际并行处理上传(但要小心僵尸线程等)。
另一种方法是使用 "multiple steps" ETL 进程,一个脚本生成 CSV,另一个脚本选择它们进行上传,同时 运行ning(这是我在例如我在 RubyKaigi 2018 的演讲)。
让我知道您的工作方式!
Thibaut,我做了类似的事情,只是我将它流式传输到一个临时文件,我想...
require 'csv'
# @param limit [Integer, 1_000] Number of rows per csv file
# @param callback [Proc] Proc taking one argument [CSV/io], that can be used after
# each csv file is finished
module PacerPro
class CSVDestination
def initialize(limit: 1_000, callback: ->(obj) { })
@limit = limit
@callback = callback
@csv = nil
@row_count = 0
end
# @param row [Hash] returned from transforms
def write(row)
csv << row.values
@row_count += 1
return if row_count < limit
self.close
end
# Called by Kiba when the transform pipeline is finished
def close
csv.close
callback.call(csv)
tempfile.unlink
@csv = nil
@row_count = 0
end
private
attr_reader :limit, :callback
attr_reader :row_count, :tempfile
def csv
@csv ||= begin
@tempfile = Tempfile.new('csv')
CSV.open(@tempfile, 'w')
end
end
end
end
Just getting started with Kiba, didn't find anything obvious, but I could be just channeling my inner child (who looks for their shoes by staring at the ceiling).
我想将一个非常大的 table 转储到 Amazon Redshift。似乎最快的方法是将一堆 CSV 文件写入 S3 存储桶,然后告诉 Redshift(通过 COPY
命令)将它们拉入。神奇的缩放 gremlins 将完成剩下的工作。
所以,我想我希望 Kiba 为每 10k 行数据写入一个 CSV 文件,然后将其推送到 s3,然后开始写入一个新文件。最后,对 COPY
那么,我可以 "pipeline" 工作还是应该是一个大的嵌套 Destination class?
即
source -> transform -> transform ... -> [ csv -> s3 ]{every 10000}; post-process
我不确定这里的确切问题。但是,我认为您的解决方案总体上似乎是正确的,但建议很少。
- 您可能也不想考虑每个 CSV 文件有超过 10K 条记录,并且在发送到 S3 时
gzip
它们。 - 您想查看
menifest
创建包含多个文件的列表,然后 运行copy
命令提供menifest
文件作为输入。
Kiba 作者在这里。感谢您试用!
目前,实现这一点的最佳方法是创建我称之为 "buffering destination" 的东西。 (某个版本可能会在某个时候出现在 Kiba Common 中)。
(请彻底测试,我今天早上刚刚为你创作了这个,根本没有运行,尽管我过去使用的通用版本较少。也请保持请记住,此版本为您的 10k 行使用内存缓冲区,因此将数字增加到更大的数量会消耗内存。不过,也可以创建内存消耗最少的版本,它会在您获取行时将行写入文件)
class BufferingDestination
def initialize(buffer_size:, on_flush:)
@buffer = []
@buffer_size
@on_flush = on_flush
@batch_index = 0
end
def write(row)
@buffer << row
flush if @buffer.size >= buffer_size
end
def flush
on_flush.call(batch_index: @batch_index, rows: @buffer)
@batch_index += 1
@buffer.clear
end
def close
flush
end
end
这是你可以像这样使用的东西,例如这里重复使用 Kiba Common CSV destination(尽管你也可以编写自己的):
require 'kiba-common/destinations/csv'
destination BufferingDestination,
buffer_size: 10_000,
on_flush: -> { |batch_index, rows|
filename = File.join("output-#{sprintf("%08d", batch_index)}")
csv = Kiba::Common::Destinations::CSV.new(
filename: filename,
csv_options: { ... },
headers: %w(my fields here)
)
rows.each { |r| csv.write(r) }
csv.close
}
您可以在生成文件后在 on_flush
块中触发您的 COPY
(如果您希望立即开始上传),或者在 post_process
块中(但这只会在所有 CSV 准备就绪后才开始,如果您愿意,这可以作为确保某种形式的事务性全局上传的功能)。
如果你真的需要这个,你可以花点时间启动一个线程队列来实际并行处理上传(但要小心僵尸线程等)。
另一种方法是使用 "multiple steps" ETL 进程,一个脚本生成 CSV,另一个脚本选择它们进行上传,同时 运行ning(这是我在例如我在 RubyKaigi 2018 的演讲)。
让我知道您的工作方式!
Thibaut,我做了类似的事情,只是我将它流式传输到一个临时文件,我想...
require 'csv'
# @param limit [Integer, 1_000] Number of rows per csv file
# @param callback [Proc] Proc taking one argument [CSV/io], that can be used after
# each csv file is finished
module PacerPro
class CSVDestination
def initialize(limit: 1_000, callback: ->(obj) { })
@limit = limit
@callback = callback
@csv = nil
@row_count = 0
end
# @param row [Hash] returned from transforms
def write(row)
csv << row.values
@row_count += 1
return if row_count < limit
self.close
end
# Called by Kiba when the transform pipeline is finished
def close
csv.close
callback.call(csv)
tempfile.unlink
@csv = nil
@row_count = 0
end
private
attr_reader :limit, :callback
attr_reader :row_count, :tempfile
def csv
@csv ||= begin
@tempfile = Tempfile.new('csv')
CSV.open(@tempfile, 'w')
end
end
end
end