Apache beam Python:条件和中断

Apache beam Python: condition and break

我对此进行了很多搜索,但没有任何有用的信息。

是否可以构建具有 if 条件的管道?在某些情况下,破坏管道?

我可以通过使用 with_outputs 并创建多个分支来部分实现。但是如果我想 运行 基于输入参数的不同管道,所有分支都被执行并且所有分支都创建输出文件,甚至一些分支有空输入。

我创建了一个包含各种条件的示例管道。

代码功能简介如下:

  1. 创建 parser(输入)参数,主要是来自 Google 存储桶的输入和输出文件目标。一个参数将是一个布尔值。
  2. 创建管道参数,例如指定 --runnertemp & staging 位置。
  3. 设置管道选项。
  4. 管道中的内容仅在变量run_pipelineTrue时执行。
  5. 根据输入参数my_condition,确定管道中读取的文件。
  6. 以上文件写入到Google Bucket
  7. 创建一个新的PCollection,只有当输入条件my_conditionTrue时才输出。

总的来说,这条管道展示了:

  • 构建具有 if 条件的管道
  • 在某些情况下,管道损坏
  • 运行 部分管道基于输入参数

如何运行代码:

  1. 将代码复制并粘贴到 python 文件中
  2. 在文件所在位置打开终端,然后输入python code_file_name.py。在 运行 代码之前,您需要编辑存储桶的名称(为您自己的名称)。

代码:

import argparse

#   import apache beam PTransforms
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText

from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions


def run(argv=None, save_main_session=True):
    """Main entry point."""

    parser = argparse.ArgumentParser()  # creates a parser

    # output & input destinations are from a Google Bucket
    parser.add_argument(  # add first output command & destination
        '--output_1',
        dest='output_1',
        default='gs://my-bucket/output_1',
        help='Output file to write results to.')
    parser.add_argument(  # add second output command & destination
        '--output_2',
        dest='output_2',
        default='gs://my-bucket/output_2',
        help='Output file (2) to write results to.')
    parser.add_argument(  # add first input command & destination
        '--input_1',
        dest='input_1',
        default='gs://my-bucket/input_1.txt',
        help='First input file to process')
    parser.add_argument(  # add second input command & destination
        '--input_2',
        dest='input_2',
        default='gs://my-bucket/input_2.txt',
        help='Second input file to process')
    parser.add_argument(  # add conditional command & destination
        '--my_condition',
        dest='my_condition',
        default=True,
        help='Condition for pipeline')

    # store our parser arguments in 'known_args'
    known_args, pipeline_args = parser.parse_known_args(argv)
    # pipeline arguments are below:
    pipeline_args.extend([
        # --runner=DataflowRunner',  # you can run this with Dataflow, but it will take longer
        '--runner=DirectRunner',
        '--project=my-project',
        '--staging_location=gs://my-bucket/staging',
        '--temp_location=gs://my-bucket/temp',
        '--job_name=demo-job',
    ])

    # setting up pipeline options

    pipeline_options = PipelineOptions(pipeline_args)
    pipeline_options.view_as(SetupOptions).save_main_session = save_main_session

    # the pipeline
    with beam.Pipeline(options=pipeline_options) as p:

        if run_pipeline:

            if known_args.my_condition:
                pipeline_read_file = known_args.input_1
            else:
                pipeline_read_file = known_args.input_2

            # PCollection containing contents of an input file, based on conditional
            pcoll = (
                p | beam.io.ReadFromText(pipeline_read_file)
            )

            # Write our PCollection to a Google Bucket
            pcoll | "Write PCollection To Bucket" >> beam.io.WriteToText(known_args.output_1, num_shards=1)

            # PCollection that will only be written based on an input condition
            pcoll_break_cond = (
                p | beam.Create([{"key": "value"}])
            )

            if known_args.my_condition:
                pcoll_break_cond | "Write Break Condition PCollection To Bucket" >> beam.io.WriteToText(known_args.output_2, num_shards=1)


if __name__ == '__main__':
    run_pipeline = True     # This variable determines if the pipeline is run at all
    run()