Dataproc 中的 BigQuery 和 Pyspark
BigQuery and Pyspark in Dataproc
我在 BigQuery 中有一个 table 我想查询并实现 FPgrowth 算法。
我想先在 pyspark shell 上使用 dataproc 集群的 VM 实例进行尝试。
我正在寻找一种使用 pyspark 直接查询 BQ 中的 table 的方法。我想用查询出来的数据来实现FPGrowth(我已经很熟悉了)
Dataproc 已经具有可用于通过 BigQuery 进行查询的必要连接器,如您在 docs.
中所见
来自文档的代码示例:
import pyspark
from pyspark.sql import SQLContext
sc = pyspark.SparkContext()
# Use the Google Cloud Storage bucket for temporary BigQuery export data used
# by the InputFormat. This assumes the Google Cloud Storage connector for
# Hadoop is configured.
bucket = sc._jsc.hadoopConfiguration().get('fs.gs.system.bucket')
project = sc._jsc.hadoopConfiguration().get('fs.gs.project.id')
input_directory = 'gs://{}/hadoop/tmp/bigquery/pyspark_input'.format(bucket)
conf = {
# Input Parameters.
'mapred.bq.project.id': project,
'mapred.bq.gcs.bucket': bucket,
'mapred.bq.temp.gcs.path': input_directory,
'mapred.bq.input.project.id': 'publicdata',
'mapred.bq.input.dataset.id': 'samples',
'mapred.bq.input.table.id': 'shakespeare',
}
# Output Parameters.
output_dataset = 'wordcount_dataset'
output_table = 'wordcount_output'
# Load data in from BigQuery.
table_data = sc.newAPIHadoopRDD(
'com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat',
'org.apache.hadoop.io.LongWritable',
'com.google.gson.JsonObject',
conf=conf)
我还建议创建一个安装了 Jupyter service 的 Dataproc 集群。这将使您有可能即时测试如何实施 FPgrowth 或您最终想尝试的任何其他想法。
事实上,在写这个答案之前,我只是用我现在的jupyter notebook查询了BQ,看看它是如何工作的:
添加到 Willian Fuks 的回答(没有足够的代表直接回答,对此深表歉意):
Dataproc 在 Big Query 和 PySpark 之间的集成存在的一个问题是,您实际上无法利用 BigQueries 查询功能来预过滤要在 Spark 中使用的数据。
当您使用 Willian 提供的示例时,实际发生的是引用 table 上的所有数据都被复制到一个临时位置,以便 Spark 可以访问它。
您可能想要使用的一个选项 - 如果它对您的用例有意义 - 在大查询中使用您感兴趣的数据子集创建临时 table(您可以在 BigQuery 中查询, 并将结果保存到一个新的 table) 中。然后,您 link 改为 table,并使用 PySpark 完成其余工作。
我在 BigQuery 中有一个 table 我想查询并实现 FPgrowth 算法。 我想先在 pyspark shell 上使用 dataproc 集群的 VM 实例进行尝试。
我正在寻找一种使用 pyspark 直接查询 BQ 中的 table 的方法。我想用查询出来的数据来实现FPGrowth(我已经很熟悉了)
Dataproc 已经具有可用于通过 BigQuery 进行查询的必要连接器,如您在 docs.
中所见来自文档的代码示例:
import pyspark
from pyspark.sql import SQLContext
sc = pyspark.SparkContext()
# Use the Google Cloud Storage bucket for temporary BigQuery export data used
# by the InputFormat. This assumes the Google Cloud Storage connector for
# Hadoop is configured.
bucket = sc._jsc.hadoopConfiguration().get('fs.gs.system.bucket')
project = sc._jsc.hadoopConfiguration().get('fs.gs.project.id')
input_directory = 'gs://{}/hadoop/tmp/bigquery/pyspark_input'.format(bucket)
conf = {
# Input Parameters.
'mapred.bq.project.id': project,
'mapred.bq.gcs.bucket': bucket,
'mapred.bq.temp.gcs.path': input_directory,
'mapred.bq.input.project.id': 'publicdata',
'mapred.bq.input.dataset.id': 'samples',
'mapred.bq.input.table.id': 'shakespeare',
}
# Output Parameters.
output_dataset = 'wordcount_dataset'
output_table = 'wordcount_output'
# Load data in from BigQuery.
table_data = sc.newAPIHadoopRDD(
'com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat',
'org.apache.hadoop.io.LongWritable',
'com.google.gson.JsonObject',
conf=conf)
我还建议创建一个安装了 Jupyter service 的 Dataproc 集群。这将使您有可能即时测试如何实施 FPgrowth 或您最终想尝试的任何其他想法。
事实上,在写这个答案之前,我只是用我现在的jupyter notebook查询了BQ,看看它是如何工作的:
添加到 Willian Fuks 的回答(没有足够的代表直接回答,对此深表歉意):
Dataproc 在 Big Query 和 PySpark 之间的集成存在的一个问题是,您实际上无法利用 BigQueries 查询功能来预过滤要在 Spark 中使用的数据。
当您使用 Willian 提供的示例时,实际发生的是引用 table 上的所有数据都被复制到一个临时位置,以便 Spark 可以访问它。
您可能想要使用的一个选项 - 如果它对您的用例有意义 - 在大查询中使用您感兴趣的数据子集创建临时 table(您可以在 BigQuery 中查询, 并将结果保存到一个新的 table) 中。然后,您 link 改为 table,并使用 PySpark 完成其余工作。