如何限制使用 FileIO 写入的每个文件的行数
How to limit number of lines per file written using FileIO
是否有可能使用 TextIO 或 FileIO 来限制每个写入分片中的行数?
示例:
- 从 Big Query - Batch Job 读取行(例如,结果为 19500 行)。
- 进行一些改造。
- 写入文件到Google云存储(19个文件,每个文件限制1000条记录,一个文件500条记录)。
- Cloud Function 被触发为 GCS 中的每个文件向外部 API 发出 POST 请求。
这是我目前尝试做的但没有用的(试图限制每个文件 1000 行):
BQ_DATA = p | 'read_bq_view' >> beam.io.Read(
beam.io.BigQuerySource(query=query,
use_standard_sql=True)) | beam.Map(json.dumps)
BQ_DATA | beam.WindowInto(GlobalWindows(), Repeatedly(trigger=AfterCount(1000)),
accumulation_mode=AccumulationMode.DISCARDING)
| WriteToFiles(path='fileio', destination="csv")
我的概念是错误的还是有其他方法可以实现?
您可以在 ParDo 中实现写入 GCS 步骤,并限制要包含在“批次”中的元素数量,如下所示:
from apache_beam.io import filesystems
class WriteToGcsWithRowLimit(beam.DoFn):
def __init__(self, row_size=1000):
self.row_size = row_size
self.rows = []
def finish_bundle(self):
if len(self.rows) > 0:
self._write_file()
def process(self, element):
self.rows.append(element)
if len(self.rows) >= self.row_size:
self._write_file()
def _write_file(self):
from time import time
new_file = 'gs://bucket/file-{}.csv'.format(time())
writer = filesystems.FileSystems.create(path=new_file)
writer.write(self.rows) # may need to format
self.rows = []
writer.close()
BQ_DATA | beam.ParDo(WriteToGcsWithRowLimit())
请注意,这不会创建任何少于 1000 行的文件,但您可以更改 process
中的逻辑来执行此操作。
(编辑1处理余数)
(编辑 2 以停止使用计数器,因为文件将被覆盖)
是否有可能使用 TextIO 或 FileIO 来限制每个写入分片中的行数?
示例:
- 从 Big Query - Batch Job 读取行(例如,结果为 19500 行)。
- 进行一些改造。
- 写入文件到Google云存储(19个文件,每个文件限制1000条记录,一个文件500条记录)。
- Cloud Function 被触发为 GCS 中的每个文件向外部 API 发出 POST 请求。
这是我目前尝试做的但没有用的(试图限制每个文件 1000 行):
BQ_DATA = p | 'read_bq_view' >> beam.io.Read(
beam.io.BigQuerySource(query=query,
use_standard_sql=True)) | beam.Map(json.dumps)
BQ_DATA | beam.WindowInto(GlobalWindows(), Repeatedly(trigger=AfterCount(1000)),
accumulation_mode=AccumulationMode.DISCARDING)
| WriteToFiles(path='fileio', destination="csv")
我的概念是错误的还是有其他方法可以实现?
您可以在 ParDo 中实现写入 GCS 步骤,并限制要包含在“批次”中的元素数量,如下所示:
from apache_beam.io import filesystems
class WriteToGcsWithRowLimit(beam.DoFn):
def __init__(self, row_size=1000):
self.row_size = row_size
self.rows = []
def finish_bundle(self):
if len(self.rows) > 0:
self._write_file()
def process(self, element):
self.rows.append(element)
if len(self.rows) >= self.row_size:
self._write_file()
def _write_file(self):
from time import time
new_file = 'gs://bucket/file-{}.csv'.format(time())
writer = filesystems.FileSystems.create(path=new_file)
writer.write(self.rows) # may need to format
self.rows = []
writer.close()
BQ_DATA | beam.ParDo(WriteToGcsWithRowLimit())
请注意,这不会创建任何少于 1000 行的文件,但您可以更改 process
中的逻辑来执行此操作。
(编辑1处理余数)
(编辑 2 以停止使用计数器,因为文件将被覆盖)