为 WritetoFiles 设置文件名
Set filename for WritetoFiles
我的流程将文件存储在磁盘上,我需要设置文件名以便找回内容。
默认命名是 window 时间戳和一个计数器,这对我没有帮助。文档对我来说不够清楚。 (https://beam.apache.org/releases/pydoc/2.16.0/apache_beam.io.fileio.html?highlight=default_file_naming)
fileio.WriteToFiles(archive_storage, file_naming=beam.io.fileio.destination_prefix_naming())
我想将文件命名为 <HASH>.json
,其中 HASH 是文件中数据的哈希值。
多亏了 this example,我才能够得到一个有效的片段。在这种情况下,我们将根据每个记录的哈希值为每个记录指定不同的 destination
,因为我们希望将每个元素写入不同的文件。此外,我们将传递名为 hash_naming
:
的自定义命名函数
data = [{'id': 0, 'message': 'hello'},
{'id': 1, 'message': 'world'}]
(p
| 'Create Events' >> beam.Create(data) \
| 'JSONify' >> beam.Map(json.dumps) \
| 'Print Hashes' >> beam.ParDo(PrintHashFn()) \
| 'Write Files' >> fileio.WriteToFiles(
path='./output',
destination=lambda record: hash(record),
sink=lambda dest: JsonSink(),
file_naming=hash_naming))
在 PrintHashFn
中,我们将使用每个散列记录每个元素:
logging.info("Element: %s with hash %s", element, hash(element))
这样,对于我们的数据,我们将得到:
INFO:root:Element: {"message": "hello", "id": 0} with hash -1885604661473532601
INFO:root:Element: {"message": "world", "id": 1} with hash 9144125507731048840
可能有更好的方法,但我发现调用 fileio.destination_prefix_naming()(*args)
我们可以从默认命名方案 (-1885604661473532601----00000-00001
) 中检索目的地 (-1885604661473532601
):
def hash_naming(*args):
file_name = fileio.destination_prefix_naming()(*args) # -1885604661473532601----00000-00001
destination = file_name.split('----')[0] # -1885604661473532601
return '{}.json'.format(destination) # -1885604661473532601.json
请注意,如果您将窗口添加到混合中,则获取子字符串的拆分可能会有所不同。
运行 使用 2.16.0 SDK 和 DirectRunner
的脚本我得到以下输出:
$ ls output/
-1885604661473532601.json 9144125507731048840.json
$ cat output/-1885604661473532601.json
"{\"message\": \"hello\", \"id\": 0}"
已更新完整代码 here。
我的流程将文件存储在磁盘上,我需要设置文件名以便找回内容。
默认命名是 window 时间戳和一个计数器,这对我没有帮助。文档对我来说不够清楚。 (https://beam.apache.org/releases/pydoc/2.16.0/apache_beam.io.fileio.html?highlight=default_file_naming)
fileio.WriteToFiles(archive_storage, file_naming=beam.io.fileio.destination_prefix_naming())
我想将文件命名为 <HASH>.json
,其中 HASH 是文件中数据的哈希值。
多亏了 this example,我才能够得到一个有效的片段。在这种情况下,我们将根据每个记录的哈希值为每个记录指定不同的 destination
,因为我们希望将每个元素写入不同的文件。此外,我们将传递名为 hash_naming
:
data = [{'id': 0, 'message': 'hello'},
{'id': 1, 'message': 'world'}]
(p
| 'Create Events' >> beam.Create(data) \
| 'JSONify' >> beam.Map(json.dumps) \
| 'Print Hashes' >> beam.ParDo(PrintHashFn()) \
| 'Write Files' >> fileio.WriteToFiles(
path='./output',
destination=lambda record: hash(record),
sink=lambda dest: JsonSink(),
file_naming=hash_naming))
在 PrintHashFn
中,我们将使用每个散列记录每个元素:
logging.info("Element: %s with hash %s", element, hash(element))
这样,对于我们的数据,我们将得到:
INFO:root:Element: {"message": "hello", "id": 0} with hash -1885604661473532601
INFO:root:Element: {"message": "world", "id": 1} with hash 9144125507731048840
可能有更好的方法,但我发现调用 fileio.destination_prefix_naming()(*args)
我们可以从默认命名方案 (-1885604661473532601----00000-00001
) 中检索目的地 (-1885604661473532601
):
def hash_naming(*args):
file_name = fileio.destination_prefix_naming()(*args) # -1885604661473532601----00000-00001
destination = file_name.split('----')[0] # -1885604661473532601
return '{}.json'.format(destination) # -1885604661473532601.json
请注意,如果您将窗口添加到混合中,则获取子字符串的拆分可能会有所不同。
运行 使用 2.16.0 SDK 和 DirectRunner
的脚本我得到以下输出:
$ ls output/
-1885604661473532601.json 9144125507731048840.json
$ cat output/-1885604661473532601.json
"{\"message\": \"hello\", \"id\": 0}"
已更新完整代码 here。