在数据流中包含其他文件

include other files with dataflow

我的数据流使用 .sql 文件。该文件包含一个查询,位于名为 queries.

的目录中

我需要将此文件与我的数据流一起上传。

我发现的是 manifest.in 文件的使用,但据我所知,它没有做任何事情,我在我的根目录中创建了一个名为 MANIFEST.in 的文件,它包含一行:

recursive-include queries *

一些其他消息来源告诉我,我需要为此使用 setup.py 文件。所以现在看起来像这样:

from __future__ import absolute_import
from __future__ import print_function

import subprocess
from distutils.command.build import build as _build

import setuptools  # pylint: disable-all
setuptools.setup(
    name='MarkPackage',
    version='0.0.1',
    install_requires=[],
    packages=setuptools.find_packages(),
    package_data={
        'queries': ['queries/*'],
    },
    include_package_data=True
)

这也不行。 错误是:RuntimeError: FileNotFoundError: [Errno 2] No such file or directory: 'queries/testquery.sql' [while running 'generatedPtransform-20']

将任何文件包含在我的数据流的任何或所有部分中使用的最佳做法是什么?

这取决于您对要包含的文件所做的操作,但考虑到这是一个 SQL 文件(而不是本地 Python 包或非 Python 依赖)"including" 的一种方式是将其放入 Google Cloud Storage 存储桶并将其添加为参数:

def run(argv=None): 
    parser = argparse.ArgumentParser()
    parser.add_argument(
        '--input',
        dest='input',
        default='gs://bucket/queries/query.sql',
        help='Input SQL file.'
        )
    known_args, pipeline_args = parser.parse_known_args(argv)
    pipeline_args.extend([
          '--runner=DataflowRunner',
          '--project=proj',
          '--region=region',
          '--staging_location=gs://bucket/staging/',
          '--temp_location=gs://bucket/temp/',
          '--job_name=name',
          '--setup_file=./setup.py'
          ]) 

现在,如果您需要将此文件用作 PTransform 中的参数,您可以将 known_args.input 传递给它。希望这有帮助

请考虑使用 filesToStage,遵循 existing SO answer 中描述的模式。这将允许您提供一个文件。此方法有一些 "gotchas",因此请仔细查看答案。

不幸的是, 是一个 java 特定的解决方案。使用资源文件夹将配置文件打包到 jar 中。然后使用 java 提供的 API 来读回文件。

这个解决方案是由我们的 Google 云顾问带给我的。它有效,但不建议这样做,因为它只是为了将 SQL 查询与 Python 代码分开而增加了复杂性。 另一种方法是在 Bigquery 上创建一个包含此 SQL 代码的视图,并在 Bigquery 环境中维护它。

MANIFEST.in
include query.sql

setup.py

import setuptools
setuptools.setup(
    name="example",
    version="0.0.1",
    install_requires=[],
    packages=setuptools.find_packages(),
    data_files=[(".", ["query.sql"])],
    include_package_data=True,
)

main.py

with open ("query.sql", "r") as myfile:
        query=myfile.read()
    with beam.Pipeline(argv=pipeline_args) as p:
        rows = p | "ReadFromBQ" >> beam.io.Read(
            beam.io.BigQuerySource(query=query, use_standard_sql=True)
        )
        rows | "writeToBQ" >> beam.io.Write(
            "BQ Write"
            >> beam.io.WriteToBigQuery(
                known_args.output_table,
                create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
            )
        )