从 Spark Worker 访问环境变量

Access environment variables from Spark Worker

我有一个需要访问 DynamoDB 表的应用程序。每个worker自己建立与数据库的连接。

我已将 AWS_ACCESS_KEY_IDAWS_SECRET_ACCESS_KEY 添加到主人和工人的 spark-env.sh 文件中。我还 运行 使用 sh 的文件以确保导出变量。

当代码运行s时,我总是得到错误:

Caused by: com.amazonaws.AmazonClientException: Unable to load AWS credentials from any provider in the chain
    at com.amazonaws.auth.AWSCredentialsProviderChain.getCredentials(AWSCredentialsProviderChain.java:131)
    at com.amazonaws.http.AmazonHttpClient.getCredentialsFromContext(AmazonHttpClient.java:774)
    at com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:800)
    at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:695)
    at com.amazonaws.http.AmazonHttpClient.doExecute(AmazonHttpClient.java:447)
    at com.amazonaws.http.AmazonHttpClient.executeWithTimer(AmazonHttpClient.java:409)
    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:358)
    at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.doInvoke(AmazonDynamoDBClient.java:2051)
    at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.invoke(AmazonDynamoDBClient.java:2021)
    at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.describeTable(AmazonDynamoDBClient.java:1299)
    at com.amazon.titan.diskstorage.dynamodb.DynamoDBDelegate.describeTable(DynamoDBDelegate.java:635)
    ... 27 more

似乎 AWS SDK 未能加载凭据,即使它们已导出。我应该尝试哪种解决方案?

您可以在 SparkConf 上使用 setExecutorEnv 方法。例如

  /**
   * Set an environment variable to be used when launching executors for this application.
   * These variables are stored as properties of the form spark.executorEnv.VAR_NAME
   * (for example spark.executorEnv.PATH) but this method makes them easier to set.
   */
  def setExecutorEnv(variable: String, value: String): SparkConf = {
    set("spark.executorEnv." + variable, value)
  }

还有

  /**
   * Set multiple environment variables to be used when launching executors.
   * These variables are stored as properties of the form spark.executorEnv.VAR_NAME
   * (for example spark.executorEnv.PATH) but this method makes them easier to set.
   */
  def setExecutorEnv(variables: Seq[(String, String)]): SparkConf = {
    for ((k, v) <- variables) {
      setExecutorEnv(k, v)
    }
    this
  }

您可以考虑其他选项,例如设置 java 系统属性:SparkConf 将自动拾取它们。