您如何为每个容器的 SAS 令牌访问实施 SASTokenProvider?

How do you implement SASTokenProvider for per-container SAS token access?

根据文档,您可以通过实施 org.apache.hadoop.fs.azurebfs.extensions.SASTokenProvider 接口,使用 SAS 令牌访问 Azure Data Lake Storage Gen2。

在此处查看文档:https://docs.microsoft.com/en-us/azure/databricks/data/data-sources/azure/adls-gen2/azure-datalake-gen2-sas-access

但是,只提供了一个 mock implementation,它使用直接访问主帐户密钥来生成 SAS 令牌:

    try {
      AbfsConfiguration abfsConfig = new AbfsConfiguration(configuration, accountName);
      accountKey = Base64.decode(abfsConfig.getStorageAccountKey());
    } catch (Exception ex) {
      throw new IOException(ex);
    }
    generator = new ServiceSASGenerator(accountKey);

当代码在可以访问主帐户密钥的上下文中 运行ning 时,通过提供 SAS 令牌来限制访问是完全没有意义的,显然这是用于测试目的的模拟实现只有.

因此,此 class 的正确实施将:

这样的实现怎么写?

这是解决方案的最基本和最简单的版本,它证明 可以做到 :

%scala
package com.foo

class CustomTokenProvider extends org.apache.hadoop.fs.azurebfs.extensions.SASTokenProvider {
  def getSASToken(accountName: String,fileSystem: String,path: String,operation: String): String = {
    return "sp=...etc etc"
  }
  def initialize(configuration: org.apache.hadoop.conf.Configuration, accountName: String): Unit = {    
  }
}

...

spark.conf.set("fs.azure.account.auth.type.STORAGE_ACC.dfs.core.windows.net", "SAS")
spark.conf.set("fs.azure.sas.token.provider.type.STORAGE_ACC.dfs.core.windows.net", "com.foo.CustomTokenProvider")
dbutils.fs.ls("abfss://sandbox@STORAGE_ACC.dfs.core.windows.net/")
> [FileInfo(path='abfss://sandbox@STORAGE_ACC.dfs.core.windows.net/t.py', name='t.py', size=112)]
dbutils.fs.ls("abfss://restricted@STORAGE_ACC.dfs.core.windows.net/")
> Operation failed: "Server failed to authenticate the request.

但是,如何将此实现更改为从秘密存储中提取密钥而不是硬编码?

注意特别是

%scala
package com.foo

import org.apache.hadoop.fs.FileSystem
import org.apache.spark.sql.catalyst.DefinedByConstructorParams

import scala.util.Try

import scala.language.implicitConversions
import scala.language.reflectiveCalls

trait DBUtilsApi {
    type SecretUtils
    type SecretMetadata
    type SecretScope
    val secrets: SecretUtils
}

object ReflectiveDBUtils extends DBUtilsApi {
    
    private lazy val dbutils: DBUtils =
        Class.forName("com.databricks.service.DBUtils$").getField("MODULE$").get().asInstanceOf[DBUtils]

    override lazy val secrets: SecretUtils = dbutils.secrets

    type DBUtils = AnyRef {
        val secrets: SecretUtils
    }

    type SecretUtils = AnyRef {
        def get(scope: String, key: String): String
        def getBytes(scope: String, key: String): Array[Byte]
        def list(scope: String): Seq[SecretMetadata]
        def listScopes(): Seq[SecretScope]
    }

    type SecretMetadata = DefinedByConstructorParams { val key: String }

    type SecretScope = DefinedByConstructorParams { val name: String }
}

class VaultTokenProvider extends org.apache.hadoop.fs.azurebfs.extensions.SASTokenProvider {
  def getSASToken(accountName: String,fileSystem: String,path: String,operation: String): String = {
    return ReflectiveDBUtils.secrets.get("scope", "SECRET")
  }
  def initialize(configuration: org.apache.hadoop.conf.Configuration, accountName: String): Unit = {    
  }
}

...

spark.conf.set("fs.azure.account.auth.type.bidbtests.dfs.core.windows.net", "SAS")
spark.conf.set("fs.azure.sas.token.provider.type.bidbtests.dfs.core.windows.net", "com.foo.VaultTokenProvider")

这只是基本示例;真正的解决方案将 select 基于帐户和文件系统的正确秘密。