如何将参数传递给数据流模板以进行管道构建
How to pass parameter to dataflow template for pipeline construction
我正在尝试进行这样的祖先查询 example 并将其转移到模板版本。
问题是参数 ancestor_id 在管道构造期间用于函数 make_query 。
如果我在创建和暂存模板时不传递它,我将得到 RuntimeValueProviderError: RuntimeValueProvider(option: ancestor_id, type: int).get() not called from a runtime context。但是,如果我在创建模板时传递它,它似乎是一个在我执行模板时永远不会改变的 StaticValueProvider。
将参数传递给模板以进行管道构建的正确方法是什么?
import apache_beam as beam
from apache_beam.io.gcp.datastore.v1.datastoreio import ReadFromDatastore
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud.proto.datastore.v1 import entity_pb2
from google.cloud.proto.datastore.v1 import query_pb2
from googledatastore import helper as datastore_helper
from googledatastore import PropertyFilter
class Test(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument('--ancestor_id', type=int)
def make_query(ancestor_id):
ancestor = entity_pb2.Key()
datastore_helper.add_key_path(ancestor, KIND, ancestor_id)
query = query_pb2.Query()
datastore_helper.set_kind(query, KIND)
datastore_helper.set_property_filter(query.filter, '__key__', PropertyFilter.HAS_ANCESTOR, ancestor)
return query
pipeline_options = PipelineOptions()
test_options = pipeline_options.view_as(TestOptions)
with beam.Pipeline(options=pipline_options) as p:
entities = p | ReadFromDatastore(PROJECT_ID, make_query(test_options.ancestor_id.get()))
两个问题。
ValueProvider.value.get()
方法只能运行在运行时的方法如ParDo.process()
。见 example.
此外,您面临的挑战是您正在使用 Google Cloud Datastore IO(来自数据存储的查询)。截至今天(2018 年 5 月),
official documentation 表示 Datastore IO NOT 尚未接受 运行 时间模板参数。
对于python,特别是
The following connectors accept runtime parameters.
File-based IOs: textio, avroio, tfrecordio
一个解决方法:您可能可以先运行一个不带任何模板参数的查询来获取实体的PCollection。此时,由于任何转换器都可以接受模板化参数,因此您可以将其用作 filter。但这取决于您的用例,它可能不适用于您。
我正在尝试进行这样的祖先查询 example 并将其转移到模板版本。
问题是参数 ancestor_id 在管道构造期间用于函数 make_query 。
如果我在创建和暂存模板时不传递它,我将得到 RuntimeValueProviderError: RuntimeValueProvider(option: ancestor_id, type: int).get() not called from a runtime context。但是,如果我在创建模板时传递它,它似乎是一个在我执行模板时永远不会改变的 StaticValueProvider。
将参数传递给模板以进行管道构建的正确方法是什么?
import apache_beam as beam
from apache_beam.io.gcp.datastore.v1.datastoreio import ReadFromDatastore
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud.proto.datastore.v1 import entity_pb2
from google.cloud.proto.datastore.v1 import query_pb2
from googledatastore import helper as datastore_helper
from googledatastore import PropertyFilter
class Test(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument('--ancestor_id', type=int)
def make_query(ancestor_id):
ancestor = entity_pb2.Key()
datastore_helper.add_key_path(ancestor, KIND, ancestor_id)
query = query_pb2.Query()
datastore_helper.set_kind(query, KIND)
datastore_helper.set_property_filter(query.filter, '__key__', PropertyFilter.HAS_ANCESTOR, ancestor)
return query
pipeline_options = PipelineOptions()
test_options = pipeline_options.view_as(TestOptions)
with beam.Pipeline(options=pipline_options) as p:
entities = p | ReadFromDatastore(PROJECT_ID, make_query(test_options.ancestor_id.get()))
两个问题。
ValueProvider.value.get()
方法只能运行在运行时的方法如ParDo.process()
。见 example.此外,您面临的挑战是您正在使用 Google Cloud Datastore IO(来自数据存储的查询)。截至今天(2018 年 5 月), official documentation 表示 Datastore IO NOT 尚未接受 运行 时间模板参数。
对于python,特别是
The following connectors accept runtime parameters. File-based IOs: textio, avroio, tfrecordio
一个解决方法:您可能可以先运行一个不带任何模板参数的查询来获取实体的PCollection。此时,由于任何转换器都可以接受模板化参数,因此您可以将其用作 filter。但这取决于您的用例,它可能不适用于您。