使用 python 在数据流中每个 window 写入一个文件

Write to one file per window in dataflow using python

从 pub/sub 等无限来源读取数据后,我正在应用 windowing。我需要将属于 window 的所有记录写入一个单独的文件。我在 Java 中找到了 this,但在 python 中找不到任何内容。

问题中没有关于您的用例的详细信息,因此您可能需要调整以下示例的某些部分。一种方法是使用元素所属的 window 作为键对元素进行分组。然后,我们利用 filesystems.FileSystems.create 来控制我们要如何写入文件。

这里我将使用 10s windows 和一些虚拟数据,其中每个事件间隔 4s。生成方式:

data = [{'event': '{}'.format(event), 'timestamp': time.time() + 4*event} for event in range(10)]

我们使用 timestamp 字段来分配元素时间戳(这只是为了以受控方式模拟 Pub/Sub 事件)。我们window事件,以windowing info为key,key分组,结果写入output文件夹:

events = (p
  | 'Create Events' >> beam.Create(data) \
  | 'Add Timestamps' >> beam.Map(lambda x: beam.window.TimestampedValue(x, x['timestamp'])) \
  | 'Add Windows' >> beam.WindowInto(window.FixedWindows(10)) \
  | 'Add Window Info' >> beam.ParDo(AddWindowingInfoFn()) \
  | 'Group By Window' >> beam.GroupByKey() \
  | 'Windowed Writes' >> beam.ParDo(WindowedWritesFn('output/')))

其中 AddWindowingInfoFn 非常简单:

class AddWindowingInfoFn(beam.DoFn):
  """output tuple of window(key) + element(value)"""
  def process(self, element, window=beam.DoFn.WindowParam):
    yield (window, element)

WindowedWritesFn 写入我们在管道中指定的路径(在我的例子中是 output/ 文件夹)。然后,我使用 window 信息作为文件名。为方便起见,我将纪元时间戳转换为 human-readable 日期。最后,我们遍历所有元素并将它们写入相应的文件。当然这个行为可以在这个函数中随意调优:

class WindowedWritesFn(beam.DoFn):
    """write one file per window/key"""
    def __init__(self, outdir):
        self.outdir = outdir

    def process(self, element):
        (window, elements) = element
        window_start = str(window.start.to_utc_datetime()).replace(" ", "_")
        window_end = str(window.end.to_utc_datetime()).replace(" ", "_")
        writer = filesystems.FileSystems.create(self.outdir + window_start + ',' + window_end + '.txt')

        for row in elements:
          writer.write(str(row)+ "\n")

        writer.close()

这会将属于每个 window 的元素写入不同的文件。在我的例子中有 5 个不同的

$ ls output/
2019-05-21_19:01:20,2019-05-21_19:01:30.txt
2019-05-21_19:01:30,2019-05-21_19:01:40.txt
2019-05-21_19:01:40,2019-05-21_19:01:50.txt
2019-05-21_19:01:50,2019-05-21_19:02:00.txt
2019-05-21_19:02:00,2019-05-21_19:02:10.txt

第一个只包含元素 0(这会因执行而异):

$ cat output/2019-05-21_19\:01\:20\,2019-05-21_19\:01\:30.txt 
{'timestamp': 1558465286.933727, 'event': '0'}

第二个包含元素1到3等等:

$ cat output/2019-05-21_19\:01\:30\,2019-05-21_19\:01\:40.txt 
{'timestamp': 1558465290.933728, 'event': '1'}
{'timestamp': 1558465294.933728, 'event': '2'}
{'timestamp': 1558465298.933729, 'event': '3'}

这种方法需要注意的是,来自同一个 window 的所有元素都被分组到同一个 worker 中。如果根据您的情况写入单个分片或输出文件,无论如何都会发生这种情况,但对于更高的负载,您可能需要考虑更大的机器类型。

完整代码here