ETL 到 csv 文件,拆分然后推送到 s3 以供 redshift 使用

ETL to csv files, split up and then pushed to s3 to be consume by redshift

Just getting started with Kiba, didn't find anything obvious, but I could be just channeling my inner child (who looks for their shoes by staring at the ceiling).

我想将一个非常大的 table 转储到 Amazon Redshift。似乎最快的方法是将一堆 CSV 文件写入 S3 存储桶,然后告诉 Redshift(通过 COPY 命令)将它们拉入。神奇的缩放 gremlins 将完成剩下的工作。

所以,我想我希望 Kiba 为每 10k 行数据写入一个 CSV 文件,然后将其推送到 s3,然后开始写入一个新文件。最后,对 COPY

进行 post 处理调用

那么,我可以 "pipeline" 工作还是应该是一个大的嵌套 Destination class?

source -> transform -> transform ... -> [ csv -> s3 ]{every 10000}; post-process

我不确定这里的确切问题。但是,我认为您的解决方案总体上似乎是正确的,但建议很少。

  1. 您可能也不想考虑每个 CSV 文件有超过 10K 条记录,并且在发送到 S3 时 gzip 它们。
  2. 您想查看 menifest 创建包含多个文件的列表,然后 运行 copy 命令提供 menifest 文件作为输入。

Kiba 作者在这里。感谢您试用!

目前,实现这一点的最佳方法是创建我称之为 "buffering destination" 的东西。 (某个版本可能会在某个时候出现在 Kiba Common 中)。

(请彻底测试,我今天早上刚刚为你创作了这个,根本没有运行,尽管我过去使用的通用版本较少。也请保持请记住,此版本为您的 10k 行使用内存缓冲区,因此将数字增加到更大的数量会消耗内存。不过,也可以创建内存消耗最少的版本,它会在您获取行时将行写入文件)

class BufferingDestination
  def initialize(buffer_size:, on_flush:)
    @buffer = []
    @buffer_size
    @on_flush = on_flush
    @batch_index = 0
  end

  def write(row)
    @buffer << row
    flush if @buffer.size >= buffer_size
  end

  def flush
    on_flush.call(batch_index: @batch_index, rows: @buffer)
    @batch_index += 1
    @buffer.clear
  end

  def close
    flush
  end
end

这是你可以像这样使用的东西,例如这里重复使用 Kiba Common CSV destination(尽管你也可以编写自己的):

require 'kiba-common/destinations/csv'

destination BufferingDestination,
  buffer_size: 10_000,
  on_flush: -> { |batch_index, rows|
    filename = File.join("output-#{sprintf("%08d", batch_index)}")
    csv = Kiba::Common::Destinations::CSV.new(
      filename: filename,
      csv_options: { ... },
      headers: %w(my fields here)
    )
    rows.each { |r| csv.write(r) }
    csv.close
  }

您可以在生成文件后在 on_flush 块中触发您的 COPY(如果您希望立即开始上传),或者在 post_process 块中(但这只会在所有 CSV 准备就绪后才开始,如果您愿意,这可以作为确保某种形式的事务性全局上传的功能)。

如果你真的需要这个,你可以花点时间启动一个线程队列来实际并行处理上传(但要小心僵尸线程等)。

另一种方法是使用 "multiple steps" ETL 进程,一个脚本生成 CSV,另一个脚本选择它们进行上传,同时 运行ning(这是我在例如我在 RubyKaigi 2018 的演讲)。

让我知道您的工作方式!

Thibaut,我做了类似的事情,只是我将它流式传输到一个临时文件,我想...

require 'csv'

# @param limit [Integer, 1_000] Number of rows per csv file
# @param callback [Proc] Proc taking one argument [CSV/io], that can be used after
#        each csv file is finished
module PacerPro
  class CSVDestination
    def initialize(limit: 1_000, callback: ->(obj) { })
      @limit = limit
      @callback = callback

      @csv = nil
      @row_count = 0
    end

    # @param row [Hash] returned from transforms
    def write(row)
      csv << row.values
      @row_count += 1
      return if row_count < limit

      self.close
    end

    # Called by Kiba when the transform pipeline is finished
    def close
      csv.close

      callback.call(csv)

      tempfile.unlink

      @csv = nil
      @row_count = 0
    end

    private

    attr_reader :limit, :callback
    attr_reader :row_count, :tempfile

    def csv
      @csv ||= begin
        @tempfile = Tempfile.new('csv')
        CSV.open(@tempfile, 'w')
      end
    end
  end
end