在 Dataflow 中使用模板化参数构建动态 Datastore 查询

Using templated arguments in Dataflow to build dynamic Datastore query

我有从 Google Cloud Datastore 读取数据的 Apache Beam 管道。管道是 运行 在 Google Cloud Dataflow 中批处理模式,它写在 Python.

问题出在模板化参数上,我试图用它来创建带有动态时间戳过滤器的数据存储区查询。

流水线定义如下:

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.gcp.datastore.v1new.datastoreio import ReadFromDatastore
from apache_beam.io.gcp.datastore.v1new.types import Query

class UserOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_value_provider_argument('--filter', type=int)

pipeline_options = PipelineOptions()

with beam.Pipeline(options=pipeline_options) as p:

    user_options = pipeline_options.view_as(UserOptions)

    data = (p
        | 'Read' >> ReadFromDatastore(build_query(user_options.filter.get()))
        | ...

build_query函数如下:

def build_query(filter):
    return Query(
        kind='Kind',
        filters=[('timestamp', '>', filter)],
        project='Project'
    )

运行 这会导致错误 RuntimeValueProvider(...).get() not called from a runtime context.

我也试过ReadFromDatastore(build_query(user_options.filter))但是错误是ValueError: (u"Unknown protobuf attr type [while running 'Read/Read']", <class 'apache_beam.options.value_provider.RuntimeValueProvider'>)

如果从等式中删除模板化参数,一切都会正常工作,例如。像这样:ReadFromDatastore(build_query(1563276063))。所以问题在于在构建数据存储区查询时使用模板化参数。

我的猜测是 build_query 应该以其他方式定义,但在花了一些时间查看文档和谷歌搜索后,我仍然不知道如何定义。

非常感谢任何可以解决此问题的建议!

编辑 1

实际上,在这种情况下,过滤器始终与当前时间戳相关,因此如果有其他方法可以使用动态值,则可能甚至不需要将其作为参数传递。尝试使用 ReadFromDatastore(build_query(int(time())-90000)) 但连续两次运行包含完全相同的过滤器。

价值提供者需要得到您使用的来源的支持。只有在那里才能在适当的时候打开包装。

创建自己的源时,您显然可以完全控制它。使用 pre-existing 来源时,我只看到两个选项:

  1. 在创建模板时提供值,这意味着不要为其使用模板参数
  2. 为 pre-existing 来源创建 PR 以支持模板参数