使用 python 在数据流中每个 window 写入一个文件
Write to one file per window in dataflow using python
从 pub/sub 等无限来源读取数据后,我正在应用 windowing。我需要将属于 window 的所有记录写入一个单独的文件。我在 Java 中找到了 this,但在 python 中找不到任何内容。
问题中没有关于您的用例的详细信息,因此您可能需要调整以下示例的某些部分。一种方法是使用元素所属的 window 作为键对元素进行分组。然后,我们利用 filesystems.FileSystems.create
来控制我们要如何写入文件。
这里我将使用 10s windows 和一些虚拟数据,其中每个事件间隔 4s。生成方式:
data = [{'event': '{}'.format(event), 'timestamp': time.time() + 4*event} for event in range(10)]
我们使用 timestamp
字段来分配元素时间戳(这只是为了以受控方式模拟 Pub/Sub 事件)。我们window事件,以windowing info为key,key分组,结果写入output
文件夹:
events = (p
| 'Create Events' >> beam.Create(data) \
| 'Add Timestamps' >> beam.Map(lambda x: beam.window.TimestampedValue(x, x['timestamp'])) \
| 'Add Windows' >> beam.WindowInto(window.FixedWindows(10)) \
| 'Add Window Info' >> beam.ParDo(AddWindowingInfoFn()) \
| 'Group By Window' >> beam.GroupByKey() \
| 'Windowed Writes' >> beam.ParDo(WindowedWritesFn('output/')))
其中 AddWindowingInfoFn
非常简单:
class AddWindowingInfoFn(beam.DoFn):
"""output tuple of window(key) + element(value)"""
def process(self, element, window=beam.DoFn.WindowParam):
yield (window, element)
和 WindowedWritesFn
写入我们在管道中指定的路径(在我的例子中是 output/
文件夹)。然后,我使用 window 信息作为文件名。为方便起见,我将纪元时间戳转换为 human-readable 日期。最后,我们遍历所有元素并将它们写入相应的文件。当然这个行为可以在这个函数中随意调优:
class WindowedWritesFn(beam.DoFn):
"""write one file per window/key"""
def __init__(self, outdir):
self.outdir = outdir
def process(self, element):
(window, elements) = element
window_start = str(window.start.to_utc_datetime()).replace(" ", "_")
window_end = str(window.end.to_utc_datetime()).replace(" ", "_")
writer = filesystems.FileSystems.create(self.outdir + window_start + ',' + window_end + '.txt')
for row in elements:
writer.write(str(row)+ "\n")
writer.close()
这会将属于每个 window 的元素写入不同的文件。在我的例子中有 5 个不同的
$ ls output/
2019-05-21_19:01:20,2019-05-21_19:01:30.txt
2019-05-21_19:01:30,2019-05-21_19:01:40.txt
2019-05-21_19:01:40,2019-05-21_19:01:50.txt
2019-05-21_19:01:50,2019-05-21_19:02:00.txt
2019-05-21_19:02:00,2019-05-21_19:02:10.txt
第一个只包含元素 0(这会因执行而异):
$ cat output/2019-05-21_19\:01\:20\,2019-05-21_19\:01\:30.txt
{'timestamp': 1558465286.933727, 'event': '0'}
第二个包含元素1到3等等:
$ cat output/2019-05-21_19\:01\:30\,2019-05-21_19\:01\:40.txt
{'timestamp': 1558465290.933728, 'event': '1'}
{'timestamp': 1558465294.933728, 'event': '2'}
{'timestamp': 1558465298.933729, 'event': '3'}
这种方法需要注意的是,来自同一个 window 的所有元素都被分组到同一个 worker 中。如果根据您的情况写入单个分片或输出文件,无论如何都会发生这种情况,但对于更高的负载,您可能需要考虑更大的机器类型。
完整代码here
从 pub/sub 等无限来源读取数据后,我正在应用 windowing。我需要将属于 window 的所有记录写入一个单独的文件。我在 Java 中找到了 this,但在 python 中找不到任何内容。
问题中没有关于您的用例的详细信息,因此您可能需要调整以下示例的某些部分。一种方法是使用元素所属的 window 作为键对元素进行分组。然后,我们利用 filesystems.FileSystems.create
来控制我们要如何写入文件。
这里我将使用 10s windows 和一些虚拟数据,其中每个事件间隔 4s。生成方式:
data = [{'event': '{}'.format(event), 'timestamp': time.time() + 4*event} for event in range(10)]
我们使用 timestamp
字段来分配元素时间戳(这只是为了以受控方式模拟 Pub/Sub 事件)。我们window事件,以windowing info为key,key分组,结果写入output
文件夹:
events = (p
| 'Create Events' >> beam.Create(data) \
| 'Add Timestamps' >> beam.Map(lambda x: beam.window.TimestampedValue(x, x['timestamp'])) \
| 'Add Windows' >> beam.WindowInto(window.FixedWindows(10)) \
| 'Add Window Info' >> beam.ParDo(AddWindowingInfoFn()) \
| 'Group By Window' >> beam.GroupByKey() \
| 'Windowed Writes' >> beam.ParDo(WindowedWritesFn('output/')))
其中 AddWindowingInfoFn
非常简单:
class AddWindowingInfoFn(beam.DoFn):
"""output tuple of window(key) + element(value)"""
def process(self, element, window=beam.DoFn.WindowParam):
yield (window, element)
和 WindowedWritesFn
写入我们在管道中指定的路径(在我的例子中是 output/
文件夹)。然后,我使用 window 信息作为文件名。为方便起见,我将纪元时间戳转换为 human-readable 日期。最后,我们遍历所有元素并将它们写入相应的文件。当然这个行为可以在这个函数中随意调优:
class WindowedWritesFn(beam.DoFn):
"""write one file per window/key"""
def __init__(self, outdir):
self.outdir = outdir
def process(self, element):
(window, elements) = element
window_start = str(window.start.to_utc_datetime()).replace(" ", "_")
window_end = str(window.end.to_utc_datetime()).replace(" ", "_")
writer = filesystems.FileSystems.create(self.outdir + window_start + ',' + window_end + '.txt')
for row in elements:
writer.write(str(row)+ "\n")
writer.close()
这会将属于每个 window 的元素写入不同的文件。在我的例子中有 5 个不同的
$ ls output/
2019-05-21_19:01:20,2019-05-21_19:01:30.txt
2019-05-21_19:01:30,2019-05-21_19:01:40.txt
2019-05-21_19:01:40,2019-05-21_19:01:50.txt
2019-05-21_19:01:50,2019-05-21_19:02:00.txt
2019-05-21_19:02:00,2019-05-21_19:02:10.txt
第一个只包含元素 0(这会因执行而异):
$ cat output/2019-05-21_19\:01\:20\,2019-05-21_19\:01\:30.txt
{'timestamp': 1558465286.933727, 'event': '0'}
第二个包含元素1到3等等:
$ cat output/2019-05-21_19\:01\:30\,2019-05-21_19\:01\:40.txt
{'timestamp': 1558465290.933728, 'event': '1'}
{'timestamp': 1558465294.933728, 'event': '2'}
{'timestamp': 1558465298.933729, 'event': '3'}
这种方法需要注意的是,来自同一个 window 的所有元素都被分组到同一个 worker 中。如果根据您的情况写入单个分片或输出文件,无论如何都会发生这种情况,但对于更高的负载,您可能需要考虑更大的机器类型。
完整代码here