在 Dataflow 中使用模板化参数构建动态 Datastore 查询
Using templated arguments in Dataflow to build dynamic Datastore query
我有从 Google Cloud Datastore 读取数据的 Apache Beam 管道。管道是 运行 在 Google Cloud Dataflow 中批处理模式,它写在 Python.
问题出在模板化参数上,我试图用它来创建带有动态时间戳过滤器的数据存储区查询。
流水线定义如下:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.gcp.datastore.v1new.datastoreio import ReadFromDatastore
from apache_beam.io.gcp.datastore.v1new.types import Query
class UserOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument('--filter', type=int)
pipeline_options = PipelineOptions()
with beam.Pipeline(options=pipeline_options) as p:
user_options = pipeline_options.view_as(UserOptions)
data = (p
| 'Read' >> ReadFromDatastore(build_query(user_options.filter.get()))
| ...
和build_query
函数如下:
def build_query(filter):
return Query(
kind='Kind',
filters=[('timestamp', '>', filter)],
project='Project'
)
运行 这会导致错误 RuntimeValueProvider(...).get() not called from a runtime context
.
我也试过ReadFromDatastore(build_query(user_options.filter))
但是错误是ValueError: (u"Unknown protobuf attr type [while running 'Read/Read']", <class 'apache_beam.options.value_provider.RuntimeValueProvider'>)
。
如果从等式中删除模板化参数,一切都会正常工作,例如。像这样:ReadFromDatastore(build_query(1563276063))
。所以问题在于在构建数据存储区查询时使用模板化参数。
我的猜测是 build_query
应该以其他方式定义,但在花了一些时间查看文档和谷歌搜索后,我仍然不知道如何定义。
非常感谢任何可以解决此问题的建议!
编辑 1
实际上,在这种情况下,过滤器始终与当前时间戳相关,因此如果有其他方法可以使用动态值,则可能甚至不需要将其作为参数传递。尝试使用 ReadFromDatastore(build_query(int(time())-90000))
但连续两次运行包含完全相同的过滤器。
价值提供者需要得到您使用的来源的支持。只有在那里才能在适当的时候打开包装。
创建自己的源时,您显然可以完全控制它。使用 pre-existing 来源时,我只看到两个选项:
- 在创建模板时提供值,这意味着不要为其使用模板参数
- 为 pre-existing 来源创建 PR 以支持模板参数
我有从 Google Cloud Datastore 读取数据的 Apache Beam 管道。管道是 运行 在 Google Cloud Dataflow 中批处理模式,它写在 Python.
问题出在模板化参数上,我试图用它来创建带有动态时间戳过滤器的数据存储区查询。
流水线定义如下:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.gcp.datastore.v1new.datastoreio import ReadFromDatastore
from apache_beam.io.gcp.datastore.v1new.types import Query
class UserOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument('--filter', type=int)
pipeline_options = PipelineOptions()
with beam.Pipeline(options=pipeline_options) as p:
user_options = pipeline_options.view_as(UserOptions)
data = (p
| 'Read' >> ReadFromDatastore(build_query(user_options.filter.get()))
| ...
和build_query
函数如下:
def build_query(filter):
return Query(
kind='Kind',
filters=[('timestamp', '>', filter)],
project='Project'
)
运行 这会导致错误 RuntimeValueProvider(...).get() not called from a runtime context
.
我也试过ReadFromDatastore(build_query(user_options.filter))
但是错误是ValueError: (u"Unknown protobuf attr type [while running 'Read/Read']", <class 'apache_beam.options.value_provider.RuntimeValueProvider'>)
。
如果从等式中删除模板化参数,一切都会正常工作,例如。像这样:ReadFromDatastore(build_query(1563276063))
。所以问题在于在构建数据存储区查询时使用模板化参数。
我的猜测是 build_query
应该以其他方式定义,但在花了一些时间查看文档和谷歌搜索后,我仍然不知道如何定义。
非常感谢任何可以解决此问题的建议!
编辑 1
实际上,在这种情况下,过滤器始终与当前时间戳相关,因此如果有其他方法可以使用动态值,则可能甚至不需要将其作为参数传递。尝试使用 ReadFromDatastore(build_query(int(time())-90000))
但连续两次运行包含完全相同的过滤器。
价值提供者需要得到您使用的来源的支持。只有在那里才能在适当的时候打开包装。
创建自己的源时,您显然可以完全控制它。使用 pre-existing 来源时,我只看到两个选项:
- 在创建模板时提供值,这意味着不要为其使用模板参数
- 为 pre-existing 来源创建 PR 以支持模板参数