Flink:使用 StreamingFileSink 时设置 ACL

Flink: Setting the ACL when using StreamingFileSink

我正在尝试将我的 Flink 作业(EMR 上的 v1.8 运行)从使用 BucketingSink 过渡到更新的 StreamingFileSink。

我得到了新代码 运行,几乎一切看起来都不错。文件被写入 S3 并过渡到完成。唯一的问题是 S3 的 ACL 没有像旧代码那样设置。

我的core-site.xml设置成这样

<configuration>

    <property>
        <name>fs.s3a.acl.default</name>
        <value>BucketOwnerFullControl</value>
    </property>

</configuration>

我还在 StreamingFileSink 构建器的 forRowFormat() 参数中使用 s3a:// 作为路径的前缀。

此外,当切换到 StreamingFileSink 时,我必须向我的 build.gradle

添加一个新的依赖项
flinkShadowJar "org.apache.flink:flink-s3-fs-hadoop:${flinkVersion}"

当我使用 BucketingSink api 时,我不是很清楚我是如何在没有这个 jar 的情况下使用 s3a:// 前缀写入 S3 的。不知何故,我现在正在以一种不尊重我的核心-site.xml 设置的方式写入 S3。

我通过反复试验发现,将以下行添加到我的 flink-conf.yml 解决了这个问题。

fs.s3a.acl.default: BucketOwnerFullControl