导入和使用 google 云包
importing and using google cloud packages
我创建了一个管道,它输出了一个数字列表。
这些数字流入 ParDo,在 ParDo 中,我使用数字查询 Bigquery table,然后我 return 查询结果。
这在本地有效。 Linux、Python 3.7、google-cloud-bigquery 1.22.0
当我将作业上传到数据流时,事情变得很有趣。
我在顶层所做的一切在下面的功能中都无效。所以我必须在每个函数中导入所有用过的包才能使用。
这太丑了,我怀疑我做错了什么。但是什么?
所以我得到了这样一个函数:
def flatten(elements):
import datetime
for element in elements['transactionids']:
print('flatten: ' + str(element) + ' ' + datetime.datetime.now().isoformat())
yield element
我得到一个 'DoFn Class' 这样的:
class TransformTransaction(beam.DoFn):
def setup(self):
print("This will never run. Why?")
def start_bundle(self):
print("Bundle Start")
from google.cloud import bigquery
self.client = bigquery.Client()
self.dataset_id = 'mydataset'
self.table_id = 'testhijs'
self.table_ref = self.client.dataset(self.dataset_id).table(self.table_id)
self.table = self.client.get_table(self.table_ref)
def retrieveTransactionData(self, transactionID):
query = f"select transactionID, someNr from `thijs-dev.thijsset.thijstable` " \
f"where transactionID = {transactionID}"
query_job = self.client.query(
query,
location="EU",
)
print(query_job)
transactions = []
for row in query_job:
transactions.append(row)
return transactions
使用管道配置--save_main_session
。这将导致全局命名空间的状态被 pickle 并加载到 Cloud Dataflow worker 上。
Python 中的完整示例:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
import argparse
def run(argv=None):
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_args.extend([
'--runner=DataflowRunner',
'--project=proj',
'--region=region',
'--staging_location=gs://bucket/staging/',
'--temp_location=gs://bucket/temp/',
'--job_name=name',
'--setup_file=./setup.py'
])
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True #this is what you need to include
编辑:link to doc
我创建了一个管道,它输出了一个数字列表。 这些数字流入 ParDo,在 ParDo 中,我使用数字查询 Bigquery table,然后我 return 查询结果。
这在本地有效。 Linux、Python 3.7、google-cloud-bigquery 1.22.0
当我将作业上传到数据流时,事情变得很有趣。 我在顶层所做的一切在下面的功能中都无效。所以我必须在每个函数中导入所有用过的包才能使用。
这太丑了,我怀疑我做错了什么。但是什么?
所以我得到了这样一个函数:
def flatten(elements):
import datetime
for element in elements['transactionids']:
print('flatten: ' + str(element) + ' ' + datetime.datetime.now().isoformat())
yield element
我得到一个 'DoFn Class' 这样的:
class TransformTransaction(beam.DoFn):
def setup(self):
print("This will never run. Why?")
def start_bundle(self):
print("Bundle Start")
from google.cloud import bigquery
self.client = bigquery.Client()
self.dataset_id = 'mydataset'
self.table_id = 'testhijs'
self.table_ref = self.client.dataset(self.dataset_id).table(self.table_id)
self.table = self.client.get_table(self.table_ref)
def retrieveTransactionData(self, transactionID):
query = f"select transactionID, someNr from `thijs-dev.thijsset.thijstable` " \
f"where transactionID = {transactionID}"
query_job = self.client.query(
query,
location="EU",
)
print(query_job)
transactions = []
for row in query_job:
transactions.append(row)
return transactions
使用管道配置--save_main_session
。这将导致全局命名空间的状态被 pickle 并加载到 Cloud Dataflow worker 上。
Python 中的完整示例:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
import argparse
def run(argv=None):
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_args.extend([
'--runner=DataflowRunner',
'--project=proj',
'--region=region',
'--staging_location=gs://bucket/staging/',
'--temp_location=gs://bucket/temp/',
'--job_name=name',
'--setup_file=./setup.py'
])
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True #this is what you need to include
编辑:link to doc