Beam launching flink 什么environment_config

What environment_config for Beam launching flink

我希望在 运行 设置 Beam wordcount.py 演示时获得有关如何设置 --environment_config 的指导。

运行DirectRunner 没问题。 Flink 的 wordcount 也 运行 很好(即 ​​运行ning Flink via flink run)。

我想 运行 Beam 使用 Flink 运行ner 使用 "seperate Flink cluster" 如 beam documentation 中所述。我不会用Docker,所以我打算用--environment_type=PROCESS

我在 python 代码中使用以下代码来设置 environment_config:

  environment_config = dict()
  environment_config['os'] = platform.system().lower()
  environment_config['arch'] = platform.machine()
  environment_config['command'] = 'ls'
  ec = "--environment_config={}".format(json.dumps(environment_config))

显然命令不正确。当我 运行 这样做时,Flink 确实接收并成功处理了 DataSource 子任务。它最终在 CHAIN MapPartition 秒超时。

有人可以提供有关如何设置 environment_config 的指导(或链接)吗?我是奇点容器中的 运行ning Beam。

对于environment_type=DOCKER,几乎所有的事情都为您准备好了,但是在进程模式下您必须自己做很多设置。您要查找的命令是 sdks/python/container/build/target/launcher/linux_amd64/boot。您将需要同时拥有该可执行文件(您可以使用 ./gradlew :sdks:python:container:build 从源代码构建)和 Python 安装,包括 Beam 和所有工作机器上的其他依赖项。

我知道的最好的例子是:https://github.com/apache/beam/blob/cbf8a900819c52940a0edd90f59bf6aec55c817a/sdks/python/test-suites/portable/py2/build.gradle#L146-L165