如何在 Kubernetes 中的 Flink 集群上 运行 Apache Beam Python 管道?
How to run Apache Beam Python pipelines on Flink cluster in Kubernetes?
正在尝试 运行 word count example on minikube following the Flink Kubernetes instructions here,但作业从未完成。 Python Beam SDK worker pooler 似乎没有做任何工作。
除了配置 Flink Kubernetes 集群的说明外,我还向任务管理器部署添加了一个 Python SDK 工作池。如果我理解正确,工作池的目的是执行管道的 Python 部分。查看完整的 k8s 部署。
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-taskmanager
namespace: flink-test
spec:
replicas: 1
selector:
matchLabels:
app: flink
component: taskmanager
template:
metadata:
labels:
app: flink
component: taskmanager
spec:
containers:
- name: taskmanager
image: flink:1.10.2-scala_2.11
workingDir: /opt/flink
command: ["/bin/bash", "-c", "$FLINK_HOME/bin/taskmanager.sh start; \
while :;
do
if [[ -f $(find log -name '*taskmanager*.log' -print -quit) ]];
then tail -f -n +1 log/*taskmanager*.log;
fi;
done"]
ports:
- containerPort: 6122
name: rpc
livenessProbe:
tcpSocket:
port: 6122
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf/
securityContext:
runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
- name: beam-worker-pool
image: apache/beam_python3.7_sdk:2.24.0
args: ["--worker_pool"]
ports:
- containerPort: 50000
name: pool
livenessProbe:
tcpSocket:
port: 50000
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf/
securityContext:
runAsUser: 9999
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j.properties
path: log4j.properties
我运行举个例子如下:
python -m apache_beam.examples.wordcount \
--output /tmp/results/count \
--runner FlinkRunner \
--flink_master=localhost:8081 \
--environment_type=EXTERNAL \
--environment_config=localhost:50000
我使用 https://beam.apache.org/documentation/runtime/sdk-harness-config/ 中的文档来设置 environment_type
和 environment_config
值。
作业已添加到作业管理器,我可以在 Flink UI 中查看它,但作业从未完成。我开始查看容器日志,注意到 beam-worker-pool 有以下日志:
Starting worker with command ['/opt/apache/beam/boot', '--id=1-1', '--logging_endpoint=localhost:46005', '--artifact_endpoint=localhost:43851', '--provision_endpoint=localhost:37079', '--control_endpoint=localhost:37961']
2020/09/28 16:44:00 Provision info:
pipeline_options:<fields: fields: > fields: > fields: > fields: > > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > > > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > > logging_endpoint: artifact_endpoint: control_endpoint: dependencies:
2020/09/28 16:44:00 Initializing python harness: /opt/apache/beam/boot --id=1-1 --logging_endpoint=localhost:46005 --artifact_endpoint=localhost:43851 --provision_endpoint=localhost:37079 --control_endpoint=localhost:37961
2020/09/28 16:44:08 Failed to retrieve staged files: failed to retrieve /tmp/staged in 3 attempts: failed to retrieve chunk for /tmp/staged/pickled_main_session
caused by:
rpc error: code = Unknown desc = ; failed to retrieve chunk for /tmp/staged/pickled_main_session
caused by:
rpc error: code = Unknown desc = ; failed to retrieve chunk for /tmp/staged/pickled_main_session
caused by:
rpc error: code = Unknown desc = ; failed to retrieve chunk for /tmp/staged/pickled_main_session
caused by:
rpc error: code = Unknown desc =
同样任务管理器正在记录:
2020-09-28 16:46:00,155 INFO org.apache.beam.runners.fnexecution.environment.ExternalEnvironmentFactory - Still waiting for startup of environment from localhost:50000 for worker id 1-1
不确定我错过了什么。我检查了工作池上的 /tmp/staging/pickled_main_session
,它是空的。
请注意,此问题与之前的问题类似。
How do I run Beam Python pipelines using Flink deployed on Kubernetes?
默认情况下(截至撰写本文时),Beam 将运行时依赖项(“工件”)暂存到某个目录(默认情况下 /tmp/staged),作业服务器(在您的情况下)都需要访问该目录, 客户端) 和 Beam worker.
您可以通过设置 --flink_submit_uber_jar
管道选项来解决这个问题。当设置 --flink_submit_uber_jar
时,Beam 将所有依赖项包装在一个提交给 Flink 的 jar 中。
正在尝试 运行 word count example on minikube following the Flink Kubernetes instructions here,但作业从未完成。 Python Beam SDK worker pooler 似乎没有做任何工作。
除了配置 Flink Kubernetes 集群的说明外,我还向任务管理器部署添加了一个 Python SDK 工作池。如果我理解正确,工作池的目的是执行管道的 Python 部分。查看完整的 k8s 部署。
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-taskmanager
namespace: flink-test
spec:
replicas: 1
selector:
matchLabels:
app: flink
component: taskmanager
template:
metadata:
labels:
app: flink
component: taskmanager
spec:
containers:
- name: taskmanager
image: flink:1.10.2-scala_2.11
workingDir: /opt/flink
command: ["/bin/bash", "-c", "$FLINK_HOME/bin/taskmanager.sh start; \
while :;
do
if [[ -f $(find log -name '*taskmanager*.log' -print -quit) ]];
then tail -f -n +1 log/*taskmanager*.log;
fi;
done"]
ports:
- containerPort: 6122
name: rpc
livenessProbe:
tcpSocket:
port: 6122
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf/
securityContext:
runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
- name: beam-worker-pool
image: apache/beam_python3.7_sdk:2.24.0
args: ["--worker_pool"]
ports:
- containerPort: 50000
name: pool
livenessProbe:
tcpSocket:
port: 50000
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf/
securityContext:
runAsUser: 9999
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j.properties
path: log4j.properties
我运行举个例子如下:
python -m apache_beam.examples.wordcount \
--output /tmp/results/count \
--runner FlinkRunner \
--flink_master=localhost:8081 \
--environment_type=EXTERNAL \
--environment_config=localhost:50000
我使用 https://beam.apache.org/documentation/runtime/sdk-harness-config/ 中的文档来设置 environment_type
和 environment_config
值。
作业已添加到作业管理器,我可以在 Flink UI 中查看它,但作业从未完成。我开始查看容器日志,注意到 beam-worker-pool 有以下日志:
Starting worker with command ['/opt/apache/beam/boot', '--id=1-1', '--logging_endpoint=localhost:46005', '--artifact_endpoint=localhost:43851', '--provision_endpoint=localhost:37079', '--control_endpoint=localhost:37961']
2020/09/28 16:44:00 Provision info:
pipeline_options:<fields: fields: > fields: > fields: > fields: > > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > > > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > > logging_endpoint: artifact_endpoint: control_endpoint: dependencies:
2020/09/28 16:44:00 Initializing python harness: /opt/apache/beam/boot --id=1-1 --logging_endpoint=localhost:46005 --artifact_endpoint=localhost:43851 --provision_endpoint=localhost:37079 --control_endpoint=localhost:37961
2020/09/28 16:44:08 Failed to retrieve staged files: failed to retrieve /tmp/staged in 3 attempts: failed to retrieve chunk for /tmp/staged/pickled_main_session
caused by:
rpc error: code = Unknown desc = ; failed to retrieve chunk for /tmp/staged/pickled_main_session
caused by:
rpc error: code = Unknown desc = ; failed to retrieve chunk for /tmp/staged/pickled_main_session
caused by:
rpc error: code = Unknown desc = ; failed to retrieve chunk for /tmp/staged/pickled_main_session
caused by:
rpc error: code = Unknown desc =
同样任务管理器正在记录:
2020-09-28 16:46:00,155 INFO org.apache.beam.runners.fnexecution.environment.ExternalEnvironmentFactory - Still waiting for startup of environment from localhost:50000 for worker id 1-1
不确定我错过了什么。我检查了工作池上的 /tmp/staging/pickled_main_session
,它是空的。
请注意,此问题与之前的问题类似。
How do I run Beam Python pipelines using Flink deployed on Kubernetes?
默认情况下(截至撰写本文时),Beam 将运行时依赖项(“工件”)暂存到某个目录(默认情况下 /tmp/staged),作业服务器(在您的情况下)都需要访问该目录, 客户端) 和 Beam worker.
您可以通过设置 --flink_submit_uber_jar
管道选项来解决这个问题。当设置 --flink_submit_uber_jar
时,Beam 将所有依赖项包装在一个提交给 Flink 的 jar 中。