Flink:使用 StreamingFileSink 时设置 ACL
Flink: Setting the ACL when using StreamingFileSink
我正在尝试将我的 Flink 作业(EMR 上的 v1.8 运行)从使用 BucketingSink 过渡到更新的 StreamingFileSink。
我得到了新代码 运行,几乎一切看起来都不错。文件被写入 S3 并过渡到完成。唯一的问题是 S3 的 ACL 没有像旧代码那样设置。
我的core-site.xml
设置成这样
<configuration>
<property>
<name>fs.s3a.acl.default</name>
<value>BucketOwnerFullControl</value>
</property>
</configuration>
我还在 StreamingFileSink 构建器的 forRowFormat()
参数中使用 s3a://
作为路径的前缀。
此外,当切换到 StreamingFileSink 时,我必须向我的 build.gradle
添加一个新的依赖项
flinkShadowJar "org.apache.flink:flink-s3-fs-hadoop:${flinkVersion}"
当我使用 BucketingSink api 时,我不是很清楚我是如何在没有这个 jar 的情况下使用 s3a:// 前缀写入 S3 的。不知何故,我现在正在以一种不尊重我的核心-site.xml 设置的方式写入 S3。
我通过反复试验发现,将以下行添加到我的 flink-conf.yml
解决了这个问题。
fs.s3a.acl.default: BucketOwnerFullControl
我正在尝试将我的 Flink 作业(EMR 上的 v1.8 运行)从使用 BucketingSink 过渡到更新的 StreamingFileSink。
我得到了新代码 运行,几乎一切看起来都不错。文件被写入 S3 并过渡到完成。唯一的问题是 S3 的 ACL 没有像旧代码那样设置。
我的core-site.xml
设置成这样
<configuration>
<property>
<name>fs.s3a.acl.default</name>
<value>BucketOwnerFullControl</value>
</property>
</configuration>
我还在 StreamingFileSink 构建器的 forRowFormat()
参数中使用 s3a://
作为路径的前缀。
此外,当切换到 StreamingFileSink 时,我必须向我的 build.gradle
添加一个新的依赖项flinkShadowJar "org.apache.flink:flink-s3-fs-hadoop:${flinkVersion}"
当我使用 BucketingSink api 时,我不是很清楚我是如何在没有这个 jar 的情况下使用 s3a:// 前缀写入 S3 的。不知何故,我现在正在以一种不尊重我的核心-site.xml 设置的方式写入 S3。
我通过反复试验发现,将以下行添加到我的 flink-conf.yml
解决了这个问题。
fs.s3a.acl.default: BucketOwnerFullControl