Dataproc 上 PySpark 中的 BigQuery 连接器 ClassNotFoundException
BigQuery connector ClassNotFoundException in PySpark on Dataproc
我正在尝试使用 Dataproc 运行 PySpark 中的脚本。
脚本是 this example 和我需要做的事情之间的一种合并,因为我想检查是否一切正常。显然不是。
我得到的错误是:
File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD.
: java.lang.ClassNotFoundException: com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat
我确保我拥有所有的罐子,并按照其他类似帖子中的建议添加了一些新的罐子。我还检查了 SPARK_HOME
变量。
下面你可以看到代码;尝试实例化 table_data.
时出现错误
"""BigQuery I/O PySpark example."""
from __future__ import absolute_import
import json
import pprint
import subprocess
import pyspark
from pyspark.sql import SQLContext
sc = pyspark.SparkContext()
bucket = sc._jsc.hadoopConfiguration().get('fs.gs.system.bucket')
project = sc._jsc.hadoopConfiguration().get('fs.gs.project.id')
input_directory = 'gs://{}/hadoop/tmp/bigquery/pyspark_input'.format(bucket)
conf = {
'mapred.bq.project.id': project,
'mapred.bq.gcs.bucket': bucket,
'mapred.bq.temp.gcs.path': input_directory,
'mapred.bq.input.project.id': 'publicdata',
'mapred.bq.input.dataset.id': 'samples',
'mapred.bq.input.table.id': 'shakespeare',
}
output_dataset = 'wordcount_dataset'
output_table = 'wordcount_output'
table_data = sc.newAPIHadoopRDD(
'com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat',
'org.apache.hadoop.io.LongWritable',
'com.google.gson.JsonObject',
conf=conf)
正如 example 中所指出的,您需要在提交作业时包含 BigQuery 连接器 jar。
通过 Dataproc 作业API:
gcloud dataproc jobs submit pyspark --cluster=${CLUSTER} \
/path/to/your/script.py \
--jars=gs://hadoop-lib/bigquery/bigquery-connector-hadoop2-latest.jar
或 spark-submit
来自集群内部:
spark-submit --jars=gs://hadoop-lib/bigquery/bigquery-connector-hadoop2-latest.jar \
/path/to/your/script.py
我正在尝试使用 Dataproc 运行 PySpark 中的脚本。
脚本是 this example 和我需要做的事情之间的一种合并,因为我想检查是否一切正常。显然不是。
我得到的错误是:
File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD. : java.lang.ClassNotFoundException: com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat
我确保我拥有所有的罐子,并按照其他类似帖子中的建议添加了一些新的罐子。我还检查了 SPARK_HOME
变量。
下面你可以看到代码;尝试实例化 table_data.
时出现错误"""BigQuery I/O PySpark example."""
from __future__ import absolute_import
import json
import pprint
import subprocess
import pyspark
from pyspark.sql import SQLContext
sc = pyspark.SparkContext()
bucket = sc._jsc.hadoopConfiguration().get('fs.gs.system.bucket')
project = sc._jsc.hadoopConfiguration().get('fs.gs.project.id')
input_directory = 'gs://{}/hadoop/tmp/bigquery/pyspark_input'.format(bucket)
conf = {
'mapred.bq.project.id': project,
'mapred.bq.gcs.bucket': bucket,
'mapred.bq.temp.gcs.path': input_directory,
'mapred.bq.input.project.id': 'publicdata',
'mapred.bq.input.dataset.id': 'samples',
'mapred.bq.input.table.id': 'shakespeare',
}
output_dataset = 'wordcount_dataset'
output_table = 'wordcount_output'
table_data = sc.newAPIHadoopRDD(
'com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat',
'org.apache.hadoop.io.LongWritable',
'com.google.gson.JsonObject',
conf=conf)
正如 example 中所指出的,您需要在提交作业时包含 BigQuery 连接器 jar。
通过 Dataproc 作业API:
gcloud dataproc jobs submit pyspark --cluster=${CLUSTER} \
/path/to/your/script.py \
--jars=gs://hadoop-lib/bigquery/bigquery-connector-hadoop2-latest.jar
或 spark-submit
来自集群内部:
spark-submit --jars=gs://hadoop-lib/bigquery/bigquery-connector-hadoop2-latest.jar \
/path/to/your/script.py