内存高效的大型数据集流式传输到 S3
Memory-efficient large dataset streaming to S3
我正在尝试使用 SQL alchemy 复制 S3 大型数据集(大于 RAM)。
我的约束是:
- 我需要使用 sqlalchemy
- 我需要将内存压力保持在最低
- 我不想使用本地文件系统作为向 s3 发送数据的中间步骤
我只想以内存高效的方式将数据从数据库传输到 S3。
我可以正常处理数据集(使用以下逻辑),但对于更大的数据集,我遇到了缓冲区问题。
我解决的第一个问题是执行查询通常会将结果缓冲在内存中。我使用 fetchmany() 方法。
engine = sqlalchemy.create_engine(db_url)
engine.execution_options(stream_results=True)
results=engine.execute('SELECT * FROM tableX;')
while True:
chunk = result.fetchmany(10000)
if not chunk:
break
另一方面,我有一个 StringIO 缓冲区,我将它与 fetchmany 数据检查一起提供。然后我将它的内容发送到 s3.
from io import StringIO
import boto3
import csv
s3_resource = boto3.resource('s3')
csv_buffer = StringIO()
csv_writer = csv.writer(csv_buffer, delimiter=';')
csv_writer.writerows(chunk)
s3_resource.Object(bucket, s3_key).put(Body=csv_buffer.getvalue())
我遇到的问题本质上是一个设计问题,如何让这些部分协同工作。在同一个运行时甚至可能吗?
engine = sqlalchemy.create_engine(db_url)
s3_resource = boto3.resource('s3')
csv_buffer = StringIO()
csv_writer = csv.writer(csv_buffer, delimiter=';')
engine.execution_options(stream_results=True)
results=engine.execute('SELECT * FROM tableX;')
while True:
chunk = result.fetchmany(10000)
csv_writer = csv.writer(csv_buffer, delimiter=';')
csv_writer.writerows(chunk)
s3_resource.Object(bucket, s3_key).put(Body=csv_buffer.getvalue())
if not chunk:
break
我可以让它在 fetchmany 的一个周期内工作,但不能在多个周期内工作。有什么想法吗?
我假设 "make these parts work together" 你的意思是你想要 S3 中的单个文件而不是部分文件?您需要做的就是创建一个文件对象,该文件对象在读取时将为下一批发出查询并对其进行缓冲。我们可以使用 python 的生成器:
def _generate_chunks(engine):
with engine.begin() as conn:
conn = conn.execution_options(stream_results=True)
results = conn.execute("")
while True:
chunk = results.fetchmany(10000)
if not chunk:
break
csv_buffer = StringIO()
csv_writer = csv.writer(csv_buffer, delimiter=';')
csv_writer.writerows(chunk)
yield csv_buffer.getvalue().encode("utf-8")
这是文件块的流,所以我们需要做的就是将它们拼接在一起(当然是懒惰地)到一个文件对象中:
class CombinedFile(io.RawIOBase):
def __init__(self, strings):
self._buffer = ""
self._strings = iter(strings)
def read(self, size=-1):
if size < 0:
return self.readall()
if not self._buffer:
try:
self._buffer = next(self._strings)
except StopIteration:
pass
if len(self._buffer) > size:
ret, self._buffer = self._buffer[:size], self._buffer[size:]
else:
ret, self._buffer = self._buffer, b""
return ret
chunks = _generate_chunks(engine)
file = CombinedFile(chunks)
upload_file_object_to_s3(file)
将文件对象流式传输到 S3 留作 reader 的练习。 (您可能可以使用 put_object
。)
我正在尝试使用 SQL alchemy 复制 S3 大型数据集(大于 RAM)。 我的约束是:
- 我需要使用 sqlalchemy
- 我需要将内存压力保持在最低
- 我不想使用本地文件系统作为向 s3 发送数据的中间步骤
我只想以内存高效的方式将数据从数据库传输到 S3。
我可以正常处理数据集(使用以下逻辑),但对于更大的数据集,我遇到了缓冲区问题。
我解决的第一个问题是执行查询通常会将结果缓冲在内存中。我使用 fetchmany() 方法。
engine = sqlalchemy.create_engine(db_url)
engine.execution_options(stream_results=True)
results=engine.execute('SELECT * FROM tableX;')
while True:
chunk = result.fetchmany(10000)
if not chunk:
break
另一方面,我有一个 StringIO 缓冲区,我将它与 fetchmany 数据检查一起提供。然后我将它的内容发送到 s3.
from io import StringIO
import boto3
import csv
s3_resource = boto3.resource('s3')
csv_buffer = StringIO()
csv_writer = csv.writer(csv_buffer, delimiter=';')
csv_writer.writerows(chunk)
s3_resource.Object(bucket, s3_key).put(Body=csv_buffer.getvalue())
我遇到的问题本质上是一个设计问题,如何让这些部分协同工作。在同一个运行时甚至可能吗?
engine = sqlalchemy.create_engine(db_url)
s3_resource = boto3.resource('s3')
csv_buffer = StringIO()
csv_writer = csv.writer(csv_buffer, delimiter=';')
engine.execution_options(stream_results=True)
results=engine.execute('SELECT * FROM tableX;')
while True:
chunk = result.fetchmany(10000)
csv_writer = csv.writer(csv_buffer, delimiter=';')
csv_writer.writerows(chunk)
s3_resource.Object(bucket, s3_key).put(Body=csv_buffer.getvalue())
if not chunk:
break
我可以让它在 fetchmany 的一个周期内工作,但不能在多个周期内工作。有什么想法吗?
我假设 "make these parts work together" 你的意思是你想要 S3 中的单个文件而不是部分文件?您需要做的就是创建一个文件对象,该文件对象在读取时将为下一批发出查询并对其进行缓冲。我们可以使用 python 的生成器:
def _generate_chunks(engine):
with engine.begin() as conn:
conn = conn.execution_options(stream_results=True)
results = conn.execute("")
while True:
chunk = results.fetchmany(10000)
if not chunk:
break
csv_buffer = StringIO()
csv_writer = csv.writer(csv_buffer, delimiter=';')
csv_writer.writerows(chunk)
yield csv_buffer.getvalue().encode("utf-8")
这是文件块的流,所以我们需要做的就是将它们拼接在一起(当然是懒惰地)到一个文件对象中:
class CombinedFile(io.RawIOBase):
def __init__(self, strings):
self._buffer = ""
self._strings = iter(strings)
def read(self, size=-1):
if size < 0:
return self.readall()
if not self._buffer:
try:
self._buffer = next(self._strings)
except StopIteration:
pass
if len(self._buffer) > size:
ret, self._buffer = self._buffer[:size], self._buffer[size:]
else:
ret, self._buffer = self._buffer, b""
return ret
chunks = _generate_chunks(engine)
file = CombinedFile(chunks)
upload_file_object_to_s3(file)
将文件对象流式传输到 S3 留作 reader 的练习。 (您可能可以使用 put_object
。)