我正在尝试使用 assumeRole 通过 FileIO 和 ParquetIO 写入 Amazon S3
I am trying to write to Amazon S3 using assumeRole via FileIO with ParquetIO
第 1 步:担任角色
public static AWSCredentialsProvider getCredentials() {
if (roleARN.length() > 0) {
STSAssumeRoleSessionCredentialsProvider credentialsProvider = new STSAssumeRoleSessionCredentialsProvider
.Builder(roleARN, Constants.SESSION_NAME)
.withStsClient(AWSSecurityTokenServiceClientBuilder.defaultClient())
.build();
return credentialsProvider;
}
return new ProfileCredentialsProvider();
}
第 2 步:将凭据设置为管道
credentials = getCredentials();
pipeline.getOptions().as(AwsOptions.class).setAwsRegion(Regions.US_WEST_2.getName());
pipeline.getOptions().as(AwsOptions.class).setAwsCredentialsProvider(new AWSStaticCredentialsProvider(new BasicAWSCredentials(credentials.getCredentials().getAWSAccessKeyId(), credentials.getCredentials().getAWSAccessKeyId())));
第 3 步:运行 写入 s3 的管道
PCollection<GenericRecord> parquetRecord = formattedEvent
.apply("ParquetRecord", ParDo.of(new ParquetWriter()))
.setCoder(AvroCoder.of(getOutput_schema()));
parquetRecord.apply(FileIO.<GenericRecord, GenericRecord>writeDynamic()
.by(elm -> elm)
.via(ParquetIO.sink(getOutput_schema()))
.to(outputPath).withNumShards(1)
.withNaming(type -> FileNaming.getNaming("part", ".snappy.parquet", "" + DateTime.now().getMillisOfSecond()))
.withDestinationCoder(AvroCoder.of(getOutput_schema())));
我正在使用 'org.apache.beam:beam-sdks-java-io-parquet:jar:2.22.0'
并且
'org.apache.beam:beam-sdks-java-io-amazon-web-services:jar:2.22.0'
问题:目前 assumeRole 似乎不起作用。
错误:
org.apache.beam.sdk.util.UserCodeException: java.lang.RuntimeException: org.apache.beam.sdk.util.UserCodeException: java.io.IOException: com.amazonaws.services.s3.model.AmazonS3Exception: The AWS Access Key Id you provided does not exist in our records.
或
Caused by: com.fasterxml.jackson.databind.JsonMappingException: Unexpected IOException (of type java.io.IOException): Failed to serialize and deserialize property 'awsCredentialsProvider' with value 'com.amazonaws.auth.InstanceProfileCredentialsProvider@71262020'
你从哪里 运行 这个管道(在 AWS 账户中?)如果是,那么最好提供对 运行 管道的角色的承担角色访问,然后从管道文件 IO 将只使用默认的 AWS 客户端。
最好将承担角色操作移出管道,只允许 S3 权限访问管道中的角色运行。
最近发布的beam(2.24.0)有承担角色的功能。
第 1 步:担任角色
public static AWSCredentialsProvider getCredentials() {
if (roleARN.length() > 0) {
STSAssumeRoleSessionCredentialsProvider credentialsProvider = new STSAssumeRoleSessionCredentialsProvider
.Builder(roleARN, Constants.SESSION_NAME)
.withStsClient(AWSSecurityTokenServiceClientBuilder.defaultClient())
.build();
return credentialsProvider;
}
return new ProfileCredentialsProvider();
}
第 2 步:将凭据设置为管道
credentials = getCredentials();
pipeline.getOptions().as(AwsOptions.class).setAwsRegion(Regions.US_WEST_2.getName());
pipeline.getOptions().as(AwsOptions.class).setAwsCredentialsProvider(new AWSStaticCredentialsProvider(new BasicAWSCredentials(credentials.getCredentials().getAWSAccessKeyId(), credentials.getCredentials().getAWSAccessKeyId())));
第 3 步:运行 写入 s3 的管道
PCollection<GenericRecord> parquetRecord = formattedEvent
.apply("ParquetRecord", ParDo.of(new ParquetWriter()))
.setCoder(AvroCoder.of(getOutput_schema()));
parquetRecord.apply(FileIO.<GenericRecord, GenericRecord>writeDynamic()
.by(elm -> elm)
.via(ParquetIO.sink(getOutput_schema()))
.to(outputPath).withNumShards(1)
.withNaming(type -> FileNaming.getNaming("part", ".snappy.parquet", "" + DateTime.now().getMillisOfSecond()))
.withDestinationCoder(AvroCoder.of(getOutput_schema())));
我正在使用 'org.apache.beam:beam-sdks-java-io-parquet:jar:2.22.0'
并且
'org.apache.beam:beam-sdks-java-io-amazon-web-services:jar:2.22.0'
问题:目前 assumeRole 似乎不起作用。
错误:
org.apache.beam.sdk.util.UserCodeException: java.lang.RuntimeException: org.apache.beam.sdk.util.UserCodeException: java.io.IOException: com.amazonaws.services.s3.model.AmazonS3Exception: The AWS Access Key Id you provided does not exist in our records.
或
Caused by: com.fasterxml.jackson.databind.JsonMappingException: Unexpected IOException (of type java.io.IOException): Failed to serialize and deserialize property 'awsCredentialsProvider' with value 'com.amazonaws.auth.InstanceProfileCredentialsProvider@71262020'
你从哪里 运行 这个管道(在 AWS 账户中?)如果是,那么最好提供对 运行 管道的角色的承担角色访问,然后从管道文件 IO 将只使用默认的 AWS 客户端。
最好将承担角色操作移出管道,只允许 S3 权限访问管道中的角色运行。
最近发布的beam(2.24.0)有承担角色的功能。