如何在 Kubernetes 中的 Flink 集群上 运行 Apache Beam Python 管道?

How to run Apache Beam Python pipelines on Flink cluster in Kubernetes?

正在尝试 运行 word count example on minikube following the Flink Kubernetes instructions here,但作业从未完成。 Python Beam SDK worker pooler 似乎没有做任何工作。

除了配置 Flink Kubernetes 集群的说明外,我还向任务管理器部署添加了一个 Python SDK 工作池。如果我理解正确,工作池的目的是执行管道的 Python 部分。查看完整的 k8s 部署。

apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-taskmanager
  namespace: flink-test
spec:
  replicas: 1
  selector:
    matchLabels:
      app: flink
      component: taskmanager
  template:
    metadata:
      labels:
        app: flink
        component: taskmanager
    spec:
      containers:
      - name: taskmanager
        image: flink:1.10.2-scala_2.11
        workingDir: /opt/flink
        command: ["/bin/bash", "-c", "$FLINK_HOME/bin/taskmanager.sh start; \
          while :;
          do
            if [[ -f $(find log -name '*taskmanager*.log' -print -quit) ]];
              then tail -f -n +1 log/*taskmanager*.log;
            fi;
          done"]
        ports:
        - containerPort: 6122
          name: rpc
        livenessProbe:
          tcpSocket:
            port: 6122
          initialDelaySeconds: 30
          periodSeconds: 60
        volumeMounts:
        - name: flink-config-volume
          mountPath: /opt/flink/conf/
        securityContext:
          runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
      - name: beam-worker-pool
        image: apache/beam_python3.7_sdk:2.24.0
        args: ["--worker_pool"]
        ports:
        - containerPort: 50000
          name: pool
        livenessProbe:
          tcpSocket:
            port: 50000
          initialDelaySeconds: 30
          periodSeconds: 60
        volumeMounts:
        - name: flink-config-volume
          mountPath: /opt/flink/conf/
        securityContext:
          runAsUser: 9999
      volumes:
      - name: flink-config-volume
        configMap:
          name: flink-config
          items:
          - key: flink-conf.yaml
            path: flink-conf.yaml
          - key: log4j.properties
            path: log4j.properties

我运行举个例子如下:

python -m apache_beam.examples.wordcount \
--output /tmp/results/count \
--runner FlinkRunner \
--flink_master=localhost:8081 \
--environment_type=EXTERNAL \
--environment_config=localhost:50000

我使用 https://beam.apache.org/documentation/runtime/sdk-harness-config/ 中的文档来设置 environment_typeenvironment_config 值。

作业已添加到作业管理器,我可以在 Flink UI 中查看它,但作业从未完成。我开始查看容器日志,注意到 beam-worker-pool 有以下日志:

Starting worker with command ['/opt/apache/beam/boot', '--id=1-1', '--logging_endpoint=localhost:46005', '--artifact_endpoint=localhost:43851', '--provision_endpoint=localhost:37079', '--control_endpoint=localhost:37961']
2020/09/28 16:44:00 Provision info:
pipeline_options:<fields: fields: > fields: > fields: > fields: > > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > > > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > > logging_endpoint: artifact_endpoint: control_endpoint: dependencies: 
2020/09/28 16:44:00 Initializing python harness: /opt/apache/beam/boot --id=1-1 --logging_endpoint=localhost:46005 --artifact_endpoint=localhost:43851 --provision_endpoint=localhost:37079 --control_endpoint=localhost:37961
2020/09/28 16:44:08 Failed to retrieve staged files: failed to retrieve /tmp/staged in 3 attempts: failed to retrieve chunk for /tmp/staged/pickled_main_session
    caused by:
rpc error: code = Unknown desc = ; failed to retrieve chunk for /tmp/staged/pickled_main_session
    caused by:
rpc error: code = Unknown desc = ; failed to retrieve chunk for /tmp/staged/pickled_main_session
    caused by:
rpc error: code = Unknown desc = ; failed to retrieve chunk for /tmp/staged/pickled_main_session
    caused by:
rpc error: code = Unknown desc = 

同样任务管理器正在记录:

2020-09-28 16:46:00,155 INFO  org.apache.beam.runners.fnexecution.environment.ExternalEnvironmentFactory  - Still waiting for startup of environment from localhost:50000 for worker id 1-1

不确定我错过了什么。我检查了工作池上的 /tmp/staging/pickled_main_session,它是空的。

请注意,此问题与之前的问题类似。 How do I run Beam Python pipelines using Flink deployed on Kubernetes?

默认情况下(截至撰写本文时),Beam 将运行时依赖项(“工件”)暂存到某个目录(默认情况下 /tmp/staged),作业服务器(在您的情况下)都需要访问该目录, 客户端) 和 Beam worker.

您可以通过设置 --flink_submit_uber_jar 管道选项来解决这个问题。当设置 --flink_submit_uber_jar 时,Beam 将所有依赖项包装在一个提交给 Flink 的 jar 中。