Flink 检查点到 Google 云存储

Flink checkpoints to Google Cloud Storage

我正在尝试为 GCS 中的 flink 作业配置检查点。 如果我 运行 在本地测试作业(没有 docker 和任何集群设置),一切正常,但如果我 运行 使用 docker-compose 或集群,它会失败并出现错误在 flink 仪表板中使用作业设置和部署 fat jar。

有什么想法吗? 谢谢!

Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'gs'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded.
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:405)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320)
at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)
at org.apache.flink.runtime.state.filesystem.FsCheckpointStorage.<init>(FsCheckpointStorage.java:61)
at org.apache.flink.runtime.state.filesystem.FsStateBackend.createCheckpointStorage(FsStateBackend.java:441)
at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createCheckpointStorage(RocksDBStateBackend.java:379)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:247)
... 33 more
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is not in the classpath/dependencies.
at org.apache.flink.core.fs.UnsupportedSchemeFactory.create(UnsupportedSchemeFactory.java:64)
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401)

环境配置是这样的:

StreamExecutionEnvironment env = applicationContext.getBean(StreamExecutionEnvironment.class);
    CheckpointConfig checkpointConfig = env.getCheckpointConfig();
    checkpointConfig.setFailOnCheckpointingErrors(false);
    checkpointConfig.setCheckpointInterval(10000);
    checkpointConfig.setMinPauseBetweenCheckpoints(5000);
    checkpointConfig.setMaxConcurrentCheckpoints(1);
    checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend(
            String.format("gs://checkpoints/%s", jobClass.getSimpleName()), true);
    env.setStateBackend((StateBackend) rocksDBStateBackend);

这是我的 core-site.xml 文件:

<configuration>
<property>
    <name>google.cloud.auth.service.account.enable</name>
    <value>true</value>
</property>
<property>
    <name>google.cloud.auth.service.account.json.keyfile</name>
    <value>${user.dir}/key.json</value>
</property>
<property>
    <name>fs.gs.impl</name>
    <value>com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem</value>
    <description>The FileSystem for gs: (GCS) uris.</description>
</property>
<property>
    <name>fs.AbstractFileSystem.gs.impl</name>
    <value>com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS</value>
    <description>The AbstractFileSystem for gs: (GCS) uris.</description>
</property>
<property>
    <name>fs.gs.application.name.suffix</name>
    <value>-kube-flink</value>
    <description>
        Appended to the user-agent header for API requests to GCS to help identify
        the traffic as coming from Dataproc.
    </description>
</property>

对 gcs-connector 的依赖:

<dependency>
        <groupId>com.google.cloud.bigdataoss</groupId>
        <artifactId>gcs-connector</artifactId>
        <version>1.9.4-hadoop2</version>
</dependency>

更新:

在对依赖项进行一些操作之后,我已经能够编写检查点了。我当前的设置是:

<dependency>
        <groupId>com.google.cloud.bigdataoss</groupId>
        <artifactId>gcs-connector</artifactId>
        <version>hadoop2-1.9.5</version>
</dependency>
<dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-statebackend-rocksdb_${scala.version}</artifactId>
        <version>1.5.1</version>
</dependency>

另外我将 flink 镜像切换到 flink:1.5.2-hadoop28

版本

不幸的是,我仍然无法读取检查点数据,因为我的工作总是无法恢复状态并出现错误:

java.lang.NoClassDefFoundError: com/google/cloud/hadoop/gcsio/GoogleCloudStorageImpl
at com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.open(GoogleCloudStorageImpl.java:666)
at com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.open(GoogleCloudStorageFileSystem.java:323)
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream.<init>(GoogleHadoopFSInputStream.java:136)
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.open(GoogleHadoopFileSystemBase.java:1102)
at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:787)
at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:119)
at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:36)
at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:80)
at org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:68)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.copyStateDataHandleData(RocksDBKeyedStateBackend.java:1005)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.transferAllDataFromStateHandles(RocksDBKeyedStateBackend.java:988)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.transferAllStateDataToDirectory(RocksDBKeyedStateBackend.java:974)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.restoreInstance(RocksDBKeyedStateBackend.java:758)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.restore(RocksDBKeyedStateBackend.java:732)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:443)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:149)
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151)
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:276)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:132)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:227)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:730)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:295)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:748)

我相信这将是最后一个错误...

问题出在方案 gs:// 的实现上。这是连接到 GCS 的协议。如果添加以下依赖项,java 程序应该能够 运行:

<dependency>
  <groupId>com.google.cloud</groupId>
  <artifactId>google-cloud-storage</artifactId>
  <version>1.35.0</version>
</dependency>

this link 中,您将了解如何为任何其他编程语言添加此依赖项。

终于找到解决方法here

您必须创建自己的映像并将gcs-connector 放入lib 目录。否则你总是会遇到类加载问题(用户代码和系统类加载器)。

要创建自定义 Docker 图像,我们创建以下 Docker 文件:

FROM registry.platform.data-artisans.net/trial/v1.0/flink:1.4.2-dap1-scala_2.11

RUN wget -O lib/gcs-connector-latest-hadoop2.jar https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-latest-hadoop2.jar

RUN wget -O lib/gcs-connector-latest-hadoop2.jar https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-latest-hadoop2.jar && \     
wget http://ftp.fau.de/apache/flink/flink-1.4.2/flink-1.4.2-bin-hadoop28-scala_2.11.tgz && \

tar xf flink-1.4.2-bin-hadoop28-scala_2.11.tgz && \
mv flink-1.4.2/lib/flink-shaded-hadoop2* lib/  && \
rm -r flink-1.4.2*  

RUN mkdir etc-hadoop
COPY <name of key file>.json etc-hadoop/
COPY core-site.xml etc-hadoop/

ENTRYPOINT ["/docker-entrypoint.sh"]
EXPOSE 6123 8081
CMD ["jobmanager"]

The Docker image will be based on the Flink image we’re providing as part of the dA Platform trial. We are adding the Google Cloud Storage connector, Flink’s Hadoop package and the key with the configuration file.

To build the custom image, the following files should be in your current directory: core-site.xml, Dockerfile and the key-file (.json).

To finally trigger the build of the custom image, we run the following command:

$ docker build -t flink-1.4.2-gs .

Once the image has been built, we will upload the image to Google’s Container Registry. To configure Docker to properly access the registry, run this command once:

$ gcloud auth configure-docker

Next, we’ll tag and upload the container:

$ docker tag flink-1.4.2-gs:latest eu.gcr.io/<your project id>/flink-1.4.2-gs
$ docker push eu.gcr.io/<your project id>/flink-1.4.2-gs

Once the upload is completed, we need to set the custom image for an Application Manager deployment. Sent the following PATCH request:

PATCH /api/v1/deployments/<your AppMgr deployment id>
 spec:
   template:
     spec:
       flinkConfiguration:
         fs.hdfs.hadoopconf: /opt/flink/etc-hadoop/
       artifact:
         flinkImageRegistry: eu.gcr.io
         flinkImageRepository: <your project id>/flink-1.4.2-gs
         flinkImageTag: latest

Alternatively, use the following curl command:

$ curl -X PATCH --header 'Content-Type: application/yaml' --header 'Accept: application/yaml' -d '  spec: \ 
    template: \ 
      spec: \ 
        flinkConfiguration:
          fs.hdfs.hadoopconf: /opt/flink/etc-hadoop/
        artifact: \ 
          flinkImageRegistry: eu.gcr.io \ 
          flinkImageRepository: <your project id>/flink-1.4.2-gs \ 
          flinkImageTag: latest' 'http://localhost:8080/api/v1/deployments/<your AppMgr deployment id>‘

With this change implemented, you’ll be able to checkpoint to Google’s Cloud Storage. Use the following pattern when specifying the directory gs:///checkpoints. For savepoints, set the state.savepoints.dir Flink configuration option.

这个 docker 文件对我有用,关键是要采用正确的依赖项版本。

我将此解决方案基于 flink k8s operator

  1. docker 文件
ARG FLINK_VERSION=1.13.1
ARG SCALA_VERSION=2.12
FROM flink:${FLINK_VERSION}-scala_${SCALA_VERSION}-java8

ARG FLINK_HADOOP_VERSION=2.8.3-10.0
ARG GCS_CONNECTOR_VERSION=latest-hadoop2

ARG GCS_CONNECTOR_NAME=gcs-connector-${GCS_CONNECTOR_VERSION}.jar
ARG GCS_CONNECTOR_URI=https://storage.googleapis.com/hadoop-lib/gcs/${GCS_CONNECTOR_NAME}
ARG FLINK_HADOOP_JAR_NAME=flink-shaded-hadoop-2-uber-${FLINK_HADOOP_VERSION}.jar
ARG FLINK_HADOOP_JAR_URI=https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/${FLINK_HADOOP_VERSION}/${FLINK_HADOOP_JAR_NAME}

#COPY target/lib /opt/flink/lib

RUN echo "Downloading ${GCS_CONNECTOR_URI}" && \
  wget -q -O /opt/flink/lib/${GCS_CONNECTOR_NAME} ${GCS_CONNECTOR_URI}
RUN echo "Downloading ${FLINK_HADOOP_JAR_URI}" && \
  wget -q -O /opt/flink/lib/${FLINK_HADOOP_JAR_NAME} ${FLINK_HADOOP_JAR_URI}

COPY target/play-flink-1.0-SNAPSHOT.jar /opt/flink/usrlib/play-flink-1.0-SNAPSHOT.jar
  1. 创建core.xml 文件:
<?xml version="1.0" ?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
    <property>
        <name>fs.AbstractFileSystem.gs.impl</name>
        <value>com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS</value>
        <description>The AbstractFileSystem for gs: uris.</description>
    </property>
    <property>
        <name>fs.gs.project.id</name>
        <value>projectName</value>
        <description>
            Optional. Google Cloud Project ID with access to GCS buckets.
            Required only for list buckets and create bucket operations.
        </description>
    </property>
    <property>
        <name>google.cloud.auth.service.account.enable</name>
        <value>true</value>
        <description>
            Whether to use a service account for GCS authorization.
        </description>
    </property>
</configuration>
  1. 创建 core.xml 作为 configmap kubectl create configmap hadoop-configmap --from-file core-site.xml

  2. 为服务帐户创建机密 kubectl create secret generic gcp-secret --from-file=key.json=${SERVICE_ACCOUNT_FILE}

  3. 在 job.yaml

    中加载 configmap 和服务帐户密码
 volumeMounts:
            - name: hadoop-configmap-volume
              mountPath: /etc/hadoop/conf
            - name: google-cloud-key
              mountPath: /etc/gcp/keys
  ....
   volumes:
        - name: google-cloud-key
          secret:
            secretName: gcp-secret
        - name: flink-config-volume
          configMap:
            name: flink-config
            items:
              - key: flink-conf.yaml
                path: flink-conf.yaml
              - key: log4j-console.properties
                path: log4j-console.properties
        - name: hadoop-configmap-volume
          configMap:
              name: hadoop-configmap
              items:
                - key: core-site.xml
                  path: core-site.xml
  1. 在flink conf中配置卷挂载路径 fs.hdfs.hadoopconf: /etc/hadoop/conf