为 WritetoFiles 设置文件名

Set filename for WritetoFiles

我的流程将文件存储在磁盘上,我需要设置文件名以便找回内容。

默认命名是 window 时间戳和一个计数器,这对我没有帮助。文档对我来说不够清楚。 (https://beam.apache.org/releases/pydoc/2.16.0/apache_beam.io.fileio.html?highlight=default_file_naming)

fileio.WriteToFiles(archive_storage, file_naming=beam.io.fileio.destination_prefix_naming())

我想将文件命名为 <HASH>.json,其中 HASH 是文件中数据的哈希值。

多亏了 this example,我才能够得到一个有效的片段。在这种情况下,我们将根据每个记录的哈希值为每个记录指定不同的 destination,因为我们希望将每个元素写入不同的文件。此外,我们将传递名为 hash_naming:

的自定义命名函数
data = [{'id': 0, 'message': 'hello'},
        {'id': 1, 'message': 'world'}]

(p
  | 'Create Events' >> beam.Create(data) \
  | 'JSONify' >> beam.Map(json.dumps) \
  | 'Print Hashes' >> beam.ParDo(PrintHashFn()) \
  | 'Write Files' >> fileio.WriteToFiles(
      path='./output',
      destination=lambda record: hash(record),
      sink=lambda dest: JsonSink(),
      file_naming=hash_naming))

PrintHashFn 中,我们将使用每个散列记录每个元素:

logging.info("Element: %s with hash %s", element, hash(element))

这样,对于我们的数据,我们将得到:

INFO:root:Element: {"message": "hello", "id": 0} with hash -1885604661473532601
INFO:root:Element: {"message": "world", "id": 1} with hash 9144125507731048840

可能有更好的方法,但我发现调用 fileio.destination_prefix_naming()(*args) 我们可以从默认命名方案 (-1885604661473532601----00000-00001) 中检索目的地 (-1885604661473532601):

def hash_naming(*args):
  file_name = fileio.destination_prefix_naming()(*args)  # -1885604661473532601----00000-00001
  destination = file_name.split('----')[0]  # -1885604661473532601
  return '{}.json'.format(destination)  # -1885604661473532601.json

请注意,如果您将窗口添加到混合中,则获取子字符串的拆分可能会有所不同。

运行 使用 2.16.0 SDK 和 DirectRunner 的脚本我得到以下输出:

$ ls output/
-1885604661473532601.json  9144125507731048840.json
$ cat output/-1885604661473532601.json 
"{\"message\": \"hello\", \"id\": 0}"

已更新完整代码 here