内存高效的大型数据集流式传输到 S3

Memory-efficient large dataset streaming to S3

我正在尝试使用 SQL alchemy 复制 S3 大型数据集(大于 RAM)。 我的约束是:

  1. 我需要使用 sqlalchemy
  2. 我需要将内存压力保持在最低
  3. 我不想使用本地文件系统作为向 s3 发送数据的中间步骤

我只想以内存高效的方式将数据从数据库传输到 S3

我可以正常处理数据集(使用以下逻辑),但对于更大的数据集,我遇到了缓冲区问题。

我解决的第一个问题是执行查询通常会将结果缓冲在内存中。我使用 fetchmany() 方法。

engine = sqlalchemy.create_engine(db_url)
engine.execution_options(stream_results=True)

results=engine.execute('SELECT * FROM tableX;')
while True:
  chunk = result.fetchmany(10000)
  if not chunk:
    break

另一方面,我有一个 StringIO 缓冲区,我将它与 fetchmany 数据检查一起提供。然后我将它的内容发送到 s3.

from io import StringIO
import boto3
import csv

s3_resource = boto3.resource('s3')
csv_buffer = StringIO()
csv_writer = csv.writer(csv_buffer, delimiter=';')
csv_writer.writerows(chunk)
s3_resource.Object(bucket, s3_key).put(Body=csv_buffer.getvalue())

我遇到的问题本质上是一个设计问题,如何让这些部分协同工作。在同一个运行时甚至可能吗?

engine = sqlalchemy.create_engine(db_url)
s3_resource = boto3.resource('s3')
csv_buffer = StringIO()
csv_writer = csv.writer(csv_buffer, delimiter=';')

engine.execution_options(stream_results=True)
results=engine.execute('SELECT * FROM tableX;')
while True:
    chunk = result.fetchmany(10000)
    csv_writer = csv.writer(csv_buffer, delimiter=';')
    csv_writer.writerows(chunk)
    s3_resource.Object(bucket, s3_key).put(Body=csv_buffer.getvalue())
    if not chunk:
        break

我可以让它在 fetchmany 的一个周期内工作,但不能在多个周期内工作。有什么想法吗?

我假设 "make these parts work together" 你的意思是你想要 S3 中的单个文件而不是部分文件?您需要做的就是创建一个文件对象,该文件对象在读取时将为下一批发出查询并对其进行缓冲。我们可以使用 python 的生成器:

def _generate_chunks(engine):
    with engine.begin() as conn:
        conn = conn.execution_options(stream_results=True)
        results = conn.execute("")
        while True:
            chunk = results.fetchmany(10000)
            if not chunk:
                break
            csv_buffer = StringIO()
            csv_writer = csv.writer(csv_buffer, delimiter=';')
            csv_writer.writerows(chunk)
            yield csv_buffer.getvalue().encode("utf-8")

这是文件块的流,所以我们需要做的就是将它们拼接在一起(当然是懒惰地)到一个文件对象中:

class CombinedFile(io.RawIOBase):
    def __init__(self, strings):
        self._buffer = ""
        self._strings = iter(strings)

    def read(self, size=-1):
        if size < 0:
            return self.readall()
        if not self._buffer:
            try:
                self._buffer = next(self._strings)
            except StopIteration:
                pass
        if len(self._buffer) > size:
            ret, self._buffer = self._buffer[:size], self._buffer[size:]
        else:
            ret, self._buffer = self._buffer, b""
        return ret

chunks = _generate_chunks(engine)
file = CombinedFile(chunks)
upload_file_object_to_s3(file)

将文件对象流式传输到 S3 留作 reader 的练习。 (您可能可以使用 put_object。)