Dataproc 上 Spark 的 BigQuery 连接器 - 无法使用服务帐户密钥文件进行身份验证
BigQuery connector for Spark on Dataproc - cannot authenticate using service account key file
我已关注 Use the BigQuery connector with Spark to successfully get data from a publicly available dataset. I now need to access a bigquery dataset that is owned by one of our clients and for which I have been given a service account key file (I know that the service account key file is valid because I can use it to connect using the Google BigQuery library for Python).
我遵循了 Igor Dvorzhak 的建议here
To use service account key file authorization you need to set mapred.bq.auth.service.account.enable
property to true and point BigQuery connector to a service account json keyfile using mapred.bq.auth.service.account.json.keyfile
property
像这样:
from pyspark.sql import SparkSession
from datetime import datetime
spark = SparkSession.builder.appName("SparkSessionBQExample").enableHiveSupport().getOrCreate()
bucket = spark._jsc.hadoopConfiguration().get('fs.gs.system.bucket')
project = spark._jsc.hadoopConfiguration().get('fs.gs.project.id')
input_directory = 'gs://{}/hadoop/tmp/bigquery/pyspark_input{}'.format(bucket, datetime.now().strftime("%Y%m%d%H%M%S"))
project_id = 'clientproject'#'publicdata'
dataset_id = 'clientdataset'#samples'
table_id = 'clienttable'#'shakespeare'
conf = {
# Input Parameters.
'mapred.bq.project.id': project,
'mapred.bq.gcs.bucket': bucket,
'mapred.bq.temp.gcs.path': input_directory,
'mapred.bq.input.project.id': project_id,
'mapred.bq.input.dataset.id': dataset_id,
'mapred.bq.input.table.id': table_id,
'mapred.bq.auth.service.account.enable': 'true'
}
# Load data in from BigQuery.
table_data = spark.sparkContext.newAPIHadoopRDD(
'com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat',
'org.apache.hadoop.io.LongWritable',
'com.google.gson.JsonObject',
conf=conf)
print ('row tally={}'.format(table_data.toDF().count()))
我已将服务帐户密钥文件放在集群的主节点和所有工作节点上的 /tmp/keyfile.json
,然后我像这样提交我的作业:
gcloud dataproc jobs submit pyspark \
./bq_pyspark.py \
--cluster $CLUSTER \
--region $REGION \
--properties=spark.hadoop.mapred.bq.auth.service.account.json.keyfile=/tmp/keyfile.json
我也试过:
gcloud dataproc jobs submit pyspark \
./bq_pyspark.py \
--cluster $CLUSTER \
--region $REGION \
--properties=spark.hadoop.mapred.bq.auth.service.account.json.keyfile=/tmp/keyfile.json,spark.hadoop.mapred.bq.auth.service.account.enable=true
以下是作业输出的相关部分:
Bigquery connector version 0.10.7-hadoop2
18/11/07 13:36:47 INFO com.google.cloud.hadoop.io.bigquery.BigQueryFactory: Creating BigQuery from default credential.
18/11/07 13:36:47 INFO com.google.cloud.hadoop.io.bigquery.BigQueryFactory: Creating BigQuery from given credential.
18/11/07 13:36:47 INFO com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration: Using working path: 'gs://dataproc-9e5dc592-1a35-42e6-9dd6-5f9dd9c8df87-europe-west1/hadoop/tmp/bigquery/pyspark_input20181107133646'
Traceback (most recent call last):
File "/tmp/b6973a26c76d4069a86806dfbd2d7d0f/bq_pyspark.py", line 30, in
conf=conf)
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/context.py", line 702, in newAPIHadoopRDD
File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in call
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD.
: com.google.api.client.googleapis.json.GoogleJsonResponseException: 403 Forbidden
{
"code" : 403,
"errors" : [ {
"domain" : "global",
"message" : "Access Denied: Table clientproject:clientdatatset.clienttable: The user mydataprocserviceaccount@myproject.iam.gserviceaccount.com does not have bigquery.tables.get permission for table clientproject:clientdatatset.clienttable.",
"reason" : "accessDenied"
} ],
"message" : "Access Denied: Table clientproject:clientdatatset.clienttable: The user mydataprocserviceaccount@myproject.iam.gserviceaccount.com does not have bigquery.tables.get permission for table clientproject:clientdatatset.clienttable."
}
行
18/11/07 13:36:47 INFO com.google.cloud.hadoop.io.bigquery.BigQueryFactory: Creating BigQuery from default credential.
可能暗示我没有正确传递服务帐户密钥文件中的凭据,所以我想我误解了 Igor 所说的内容(或者缺少某些信息)。
如果有人能告诉我哪里出错了,我将不胜感激。
更新...
我试图通过代码而不是通过命令行提供所需的身份验证配置:
conf = {
# Input Parameters.
'mapred.bq.project.id': project,
'mapred.bq.gcs.bucket': bucket,
'mapred.bq.temp.gcs.path': input_directory,
'mapred.bq.input.project.id': project_id,
'mapred.bq.input.dataset.id': dataset_id,
'mapred.bq.input.table.id': table_id,
'mapred.bq.auth.service.account.enable': 'true',
'mapred.bq.auth.service.account.keyfile': '/tmp/keyfile.json',
'mapred.bq.auth.service.account.email': 'username@clientproject.iam.gserviceaccount.com'
}
这次我得到了不同的错误:
18/11/07 16:44:21 INFO com.google.cloud.hadoop.io.bigquery.BigQueryFactory: Creating BigQuery from default credential.
Traceback (most recent call last):
File "/tmp/cb5cbb16d59945dd926cab2c1f2f5524/bq_pyspark.py", line 39, in
conf=conf)
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/context.py", line 702, in newAPIHadoopRDD
File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in call
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD.
: java.io.IOException: toDerInputStream rejects tag type 123
at sun.security.util.DerValue.toDerInputStream(DerValue.java:881)
at sun.security.pkcs12.PKCS12KeyStore.engineLoad(PKCS12KeyStore.java:1939)
at java.security.KeyStore.load(KeyStore.java:1445)
at com.google.api.client.util.SecurityUtils.loadKeyStore(SecurityUtils.java:82)
at com.google.api.client.util.SecurityUtils.loadPrivateKeyFromKeyStore(SecurityUtils.java:115)
at com.google.api.client.googleapis.auth.oauth2.GoogleCredential$Builder.setServiceAccountPrivateKeyFromP12File(GoogleCredential.java:670)
at com.google.cloud.hadoop.util.CredentialFactory.getCredentialFromPrivateKeyServiceAccount(CredentialFactory.java:251)
at com.google.cloud.hadoop.util.CredentialConfiguration.getCredential(CredentialConfiguration.java:100)
at com.google.cloud.hadoop.io.bigquery.BigQueryFactory.createBigQueryCredential(BigQueryFactory.java:95)
at com.google.cloud.hadoop.io.bigquery.BigQueryFactory.getBigQuery(BigQueryFactory.java:115)
at com.google.cloud.hadoop.io.bigquery.BigQueryFactory.getBigQueryHelper(BigQueryFactory.java:103)
我用谷歌搜索了 "toDerInputStream rejects tag type 123",结果找到了 toDerInputStream rejects tag type 123,这表明我需要使用 P12 文件进行身份验证。这与调用堆栈中提到的 sun.security.pkcs12.PKCS12KeyStore
是一致的。因此,我认为我需要一个 P12 文件(又名 PKCS#12 格式文件)而不是一个 .json 文件,这意味着我需要回到客户那里索要那个文件——根据我的经验,我认为它可能花一些时间来获取P12文件。我会报告 if/when 我到达任何地方。
更新 2...在 Igor 的帮助下弄明白了。我错误地指定了 mapred.bq.auth.service.account.keyfile
,它需要是 mapred.bq.auth.service.account.json.keyfile
。因此相关的代码部分变为:
conf = {
# Input Parameters.
'mapred.bq.project.id': project,
'mapred.bq.gcs.bucket': bucket,
'mapred.bq.temp.gcs.path': input_directory,
'mapred.bq.input.project.id': project_id,
'mapred.bq.input.dataset.id': dataset_id,
'mapred.bq.input.table.id': table_id,
'mapred.bq.auth.service.account.enable': 'true',
'mapred.bq.auth.service.account.json.keyfile': '/tmp/keyfile.json'
}
table_data = spark.sparkContext.newAPIHadoopRDD(
'com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat',
'org.apache.hadoop.io.LongWritable',
'com.google.gson.JsonObject',
conf=conf)
提交命令很简单
gcloud dataproc jobs submit pyspark \
./bq_pyspark.py \
--cluster $CLUSTER \
--region $REGION
现在可以使用了,我可以从 spark-on-dataproc 访问双查询中的数据,使用服务帐户 json 密钥文件进行身份验证。谢谢伊戈尔。
问题似乎出在这里:
Warning: Ignoring non-spark config property: mapred.bq.auth.service.account.json.keyfile=/tmp/keyfile.json
要解决此问题,您 should 在 Spark 中设置带有 spark.hadoop
前缀的 Hadoop 属性:
gcloud dataproc jobs submit pyspark ./bq_pyspark.py \
--cluster $CLUSTER --region $REGION \
--properties=spark.hadoop.mapred.bq.auth.service.account.json.keyfile=/tmp/keyfile.json
我已关注 Use the BigQuery connector with Spark to successfully get data from a publicly available dataset. I now need to access a bigquery dataset that is owned by one of our clients and for which I have been given a service account key file (I know that the service account key file is valid because I can use it to connect using the Google BigQuery library for Python).
我遵循了 Igor Dvorzhak 的建议here
To use service account key file authorization you need to set
mapred.bq.auth.service.account.enable
property to true and point BigQuery connector to a service account json keyfile usingmapred.bq.auth.service.account.json.keyfile
property
像这样:
from pyspark.sql import SparkSession
from datetime import datetime
spark = SparkSession.builder.appName("SparkSessionBQExample").enableHiveSupport().getOrCreate()
bucket = spark._jsc.hadoopConfiguration().get('fs.gs.system.bucket')
project = spark._jsc.hadoopConfiguration().get('fs.gs.project.id')
input_directory = 'gs://{}/hadoop/tmp/bigquery/pyspark_input{}'.format(bucket, datetime.now().strftime("%Y%m%d%H%M%S"))
project_id = 'clientproject'#'publicdata'
dataset_id = 'clientdataset'#samples'
table_id = 'clienttable'#'shakespeare'
conf = {
# Input Parameters.
'mapred.bq.project.id': project,
'mapred.bq.gcs.bucket': bucket,
'mapred.bq.temp.gcs.path': input_directory,
'mapred.bq.input.project.id': project_id,
'mapred.bq.input.dataset.id': dataset_id,
'mapred.bq.input.table.id': table_id,
'mapred.bq.auth.service.account.enable': 'true'
}
# Load data in from BigQuery.
table_data = spark.sparkContext.newAPIHadoopRDD(
'com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat',
'org.apache.hadoop.io.LongWritable',
'com.google.gson.JsonObject',
conf=conf)
print ('row tally={}'.format(table_data.toDF().count()))
我已将服务帐户密钥文件放在集群的主节点和所有工作节点上的 /tmp/keyfile.json
,然后我像这样提交我的作业:
gcloud dataproc jobs submit pyspark \
./bq_pyspark.py \
--cluster $CLUSTER \
--region $REGION \
--properties=spark.hadoop.mapred.bq.auth.service.account.json.keyfile=/tmp/keyfile.json
我也试过:
gcloud dataproc jobs submit pyspark \
./bq_pyspark.py \
--cluster $CLUSTER \
--region $REGION \
--properties=spark.hadoop.mapred.bq.auth.service.account.json.keyfile=/tmp/keyfile.json,spark.hadoop.mapred.bq.auth.service.account.enable=true
以下是作业输出的相关部分:
Bigquery connector version 0.10.7-hadoop2
18/11/07 13:36:47 INFO com.google.cloud.hadoop.io.bigquery.BigQueryFactory: Creating BigQuery from default credential.
18/11/07 13:36:47 INFO com.google.cloud.hadoop.io.bigquery.BigQueryFactory: Creating BigQuery from given credential.
18/11/07 13:36:47 INFO com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration: Using working path: 'gs://dataproc-9e5dc592-1a35-42e6-9dd6-5f9dd9c8df87-europe-west1/hadoop/tmp/bigquery/pyspark_input20181107133646'
Traceback (most recent call last):
File "/tmp/b6973a26c76d4069a86806dfbd2d7d0f/bq_pyspark.py", line 30, in
conf=conf)
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/context.py", line 702, in newAPIHadoopRDD
File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in call
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD.
: com.google.api.client.googleapis.json.GoogleJsonResponseException: 403 Forbidden
{
"code" : 403,
"errors" : [ {
"domain" : "global",
"message" : "Access Denied: Table clientproject:clientdatatset.clienttable: The user mydataprocserviceaccount@myproject.iam.gserviceaccount.com does not have bigquery.tables.get permission for table clientproject:clientdatatset.clienttable.",
"reason" : "accessDenied"
} ],
"message" : "Access Denied: Table clientproject:clientdatatset.clienttable: The user mydataprocserviceaccount@myproject.iam.gserviceaccount.com does not have bigquery.tables.get permission for table clientproject:clientdatatset.clienttable."
}
行
18/11/07 13:36:47 INFO com.google.cloud.hadoop.io.bigquery.BigQueryFactory: Creating BigQuery from default credential.
可能暗示我没有正确传递服务帐户密钥文件中的凭据,所以我想我误解了 Igor 所说的内容(或者缺少某些信息)。
如果有人能告诉我哪里出错了,我将不胜感激。
更新... 我试图通过代码而不是通过命令行提供所需的身份验证配置:
conf = {
# Input Parameters.
'mapred.bq.project.id': project,
'mapred.bq.gcs.bucket': bucket,
'mapred.bq.temp.gcs.path': input_directory,
'mapred.bq.input.project.id': project_id,
'mapred.bq.input.dataset.id': dataset_id,
'mapred.bq.input.table.id': table_id,
'mapred.bq.auth.service.account.enable': 'true',
'mapred.bq.auth.service.account.keyfile': '/tmp/keyfile.json',
'mapred.bq.auth.service.account.email': 'username@clientproject.iam.gserviceaccount.com'
}
这次我得到了不同的错误:
18/11/07 16:44:21 INFO com.google.cloud.hadoop.io.bigquery.BigQueryFactory: Creating BigQuery from default credential.
Traceback (most recent call last):
File "/tmp/cb5cbb16d59945dd926cab2c1f2f5524/bq_pyspark.py", line 39, in
conf=conf)
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/context.py", line 702, in newAPIHadoopRDD
File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in call
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD.
: java.io.IOException: toDerInputStream rejects tag type 123
at sun.security.util.DerValue.toDerInputStream(DerValue.java:881)
at sun.security.pkcs12.PKCS12KeyStore.engineLoad(PKCS12KeyStore.java:1939)
at java.security.KeyStore.load(KeyStore.java:1445)
at com.google.api.client.util.SecurityUtils.loadKeyStore(SecurityUtils.java:82)
at com.google.api.client.util.SecurityUtils.loadPrivateKeyFromKeyStore(SecurityUtils.java:115)
at com.google.api.client.googleapis.auth.oauth2.GoogleCredential$Builder.setServiceAccountPrivateKeyFromP12File(GoogleCredential.java:670)
at com.google.cloud.hadoop.util.CredentialFactory.getCredentialFromPrivateKeyServiceAccount(CredentialFactory.java:251)
at com.google.cloud.hadoop.util.CredentialConfiguration.getCredential(CredentialConfiguration.java:100)
at com.google.cloud.hadoop.io.bigquery.BigQueryFactory.createBigQueryCredential(BigQueryFactory.java:95)
at com.google.cloud.hadoop.io.bigquery.BigQueryFactory.getBigQuery(BigQueryFactory.java:115)
at com.google.cloud.hadoop.io.bigquery.BigQueryFactory.getBigQueryHelper(BigQueryFactory.java:103)
我用谷歌搜索了 "toDerInputStream rejects tag type 123",结果找到了 toDerInputStream rejects tag type 123,这表明我需要使用 P12 文件进行身份验证。这与调用堆栈中提到的 sun.security.pkcs12.PKCS12KeyStore
是一致的。因此,我认为我需要一个 P12 文件(又名 PKCS#12 格式文件)而不是一个 .json 文件,这意味着我需要回到客户那里索要那个文件——根据我的经验,我认为它可能花一些时间来获取P12文件。我会报告 if/when 我到达任何地方。
更新 2...在 Igor 的帮助下弄明白了。我错误地指定了 mapred.bq.auth.service.account.keyfile
,它需要是 mapred.bq.auth.service.account.json.keyfile
。因此相关的代码部分变为:
conf = {
# Input Parameters.
'mapred.bq.project.id': project,
'mapred.bq.gcs.bucket': bucket,
'mapred.bq.temp.gcs.path': input_directory,
'mapred.bq.input.project.id': project_id,
'mapred.bq.input.dataset.id': dataset_id,
'mapred.bq.input.table.id': table_id,
'mapred.bq.auth.service.account.enable': 'true',
'mapred.bq.auth.service.account.json.keyfile': '/tmp/keyfile.json'
}
table_data = spark.sparkContext.newAPIHadoopRDD(
'com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat',
'org.apache.hadoop.io.LongWritable',
'com.google.gson.JsonObject',
conf=conf)
提交命令很简单
gcloud dataproc jobs submit pyspark \
./bq_pyspark.py \
--cluster $CLUSTER \
--region $REGION
现在可以使用了,我可以从 spark-on-dataproc 访问双查询中的数据,使用服务帐户 json 密钥文件进行身份验证。谢谢伊戈尔。
问题似乎出在这里:
Warning: Ignoring non-spark config property: mapred.bq.auth.service.account.json.keyfile=/tmp/keyfile.json
要解决此问题,您 should 在 Spark 中设置带有 spark.hadoop
前缀的 Hadoop 属性:
gcloud dataproc jobs submit pyspark ./bq_pyspark.py \
--cluster $CLUSTER --region $REGION \
--properties=spark.hadoop.mapred.bq.auth.service.account.json.keyfile=/tmp/keyfile.json