Google 云数据流、TextIO 和 Kerberized HDFS
Google Cloud Dataflow, TextIO and Kerberized HDFS
我正在尝试在 Dataflow runner 上使用 Beam Java 2.22.0 从 kerberized HDFS 读取 TSV 文件。我使用带有 kerberos 组件的 Dataproc 集群来提供 kerberos 化的 HDFS。我得到的错误是:
Error message from worker: org.apache.hadoop.security.AccessControlException: SIMPLE authentication is not enabled. Available:[TOKEN, KERBEROS]
我正在按如下方式配置管道(请注意,我已经配置了 java.security.krb5.realm/kdc,我认为这对工作人员来说是不必要的 krb5.conf。我的 HdfsTextIOOptions
扩展了 HadoopFileSystemOptions
,这让我可以使用我的 Hadoop 配置来初始化管道。
我正在从 GCS 位置获取(当前未加密的)密钥表,并使用它来初始化 UserGroupInformation
。
public static void main(String[] args) throws IOException {
System.setProperty("java.security.krb5.realm", "MY_REALM");
System.setProperty("java.security.krb5.kdc", "my.kdc.hostname");
HdfsTextIOOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(
HdfsTextIOOptions.class);
Storage storage = StorageOptions.getDefaultInstance().getService();
URI uri = URI.create(options.getGcsKeytabPath());
System.err.println(String
.format("URI: %s, filesystem: %s, bucket: %s, filename: %s", uri.toString(),
uri.getScheme(), uri.getAuthority(),
uri.getPath()));
Blob keytabBlob = storage.get(BlobId.of(uri.getAuthority(),
uri.getPath().startsWith("/") ? uri.getPath().substring(1) : uri.getPath()));
Path localKeytabPath = Paths.get("/tmp", uri.getPath());
System.err.println(localKeytabPath);
keytabBlob.downloadTo(localKeytabPath);
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://namenode:8020");
conf.set("hadoop.security.authentication", "kerberos");
UserGroupInformation
.loginUserFromKeytab(options.getUserPrincipal(), localKeytabPath.toString());
UserGroupInformation.setConfiguration(conf);
options.setHdfsConfiguration(ImmutableList.of(conf));
Pipeline p = Pipeline.create(options);
p.apply(TextIO.read().from(options.getInputFile()))
...
我是否缺少一些必要的配置来从 Dataflow 上的 Beam 正确访问 kerberized HDFS?
谢谢!
您似乎是在构建时在管道中设置系统属性。您还需要确保在管道执行期间设置这些属性。
一个简单的方法是编写您自己的 JvmInitializer 来设置这些属性。工作人员将使用 Java 的 ServiceLoader 实例化您的 JvmInitialier。
我正在尝试在 Dataflow runner 上使用 Beam Java 2.22.0 从 kerberized HDFS 读取 TSV 文件。我使用带有 kerberos 组件的 Dataproc 集群来提供 kerberos 化的 HDFS。我得到的错误是:
Error message from worker: org.apache.hadoop.security.AccessControlException: SIMPLE authentication is not enabled. Available:[TOKEN, KERBEROS]
我正在按如下方式配置管道(请注意,我已经配置了 java.security.krb5.realm/kdc,我认为这对工作人员来说是不必要的 krb5.conf。我的 HdfsTextIOOptions
扩展了 HadoopFileSystemOptions
,这让我可以使用我的 Hadoop 配置来初始化管道。
我正在从 GCS 位置获取(当前未加密的)密钥表,并使用它来初始化 UserGroupInformation
。
public static void main(String[] args) throws IOException {
System.setProperty("java.security.krb5.realm", "MY_REALM");
System.setProperty("java.security.krb5.kdc", "my.kdc.hostname");
HdfsTextIOOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(
HdfsTextIOOptions.class);
Storage storage = StorageOptions.getDefaultInstance().getService();
URI uri = URI.create(options.getGcsKeytabPath());
System.err.println(String
.format("URI: %s, filesystem: %s, bucket: %s, filename: %s", uri.toString(),
uri.getScheme(), uri.getAuthority(),
uri.getPath()));
Blob keytabBlob = storage.get(BlobId.of(uri.getAuthority(),
uri.getPath().startsWith("/") ? uri.getPath().substring(1) : uri.getPath()));
Path localKeytabPath = Paths.get("/tmp", uri.getPath());
System.err.println(localKeytabPath);
keytabBlob.downloadTo(localKeytabPath);
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://namenode:8020");
conf.set("hadoop.security.authentication", "kerberos");
UserGroupInformation
.loginUserFromKeytab(options.getUserPrincipal(), localKeytabPath.toString());
UserGroupInformation.setConfiguration(conf);
options.setHdfsConfiguration(ImmutableList.of(conf));
Pipeline p = Pipeline.create(options);
p.apply(TextIO.read().from(options.getInputFile()))
...
我是否缺少一些必要的配置来从 Dataflow 上的 Beam 正确访问 kerberized HDFS?
谢谢!
您似乎是在构建时在管道中设置系统属性。您还需要确保在管道执行期间设置这些属性。
一个简单的方法是编写您自己的 JvmInitializer 来设置这些属性。工作人员将使用 Java 的 ServiceLoader 实例化您的 JvmInitialier。