Apache beam Python:条件和中断
Apache beam Python: condition and break
我对此进行了很多搜索,但没有任何有用的信息。
是否可以构建具有 if
条件的管道?在某些情况下,破坏管道?
我可以通过使用 with_outputs
并创建多个分支来部分实现。但是如果我想 运行 基于输入参数的不同管道,所有分支都被执行并且所有分支都创建输出文件,甚至一些分支有空输入。
我创建了一个包含各种条件的示例管道。
代码功能简介如下:
- 创建
parser
(输入)参数,主要是来自 Google 存储桶的输入和输出文件目标。一个参数将是一个布尔值。
- 创建管道参数,例如指定
--runner
和 temp
& staging
位置。
- 设置管道选项。
- 管道中的内容仅在变量
run_pipeline
为True
时执行。
- 根据输入参数
my_condition
,确定管道中读取的文件。
- 以上文件写入到Google Bucket
- 创建一个新的
PCollection
,只有当输入条件my_condition
为True
时才输出。
总的来说,这条管道展示了:
- 构建具有
if
条件的管道
- 在某些情况下,管道损坏
- 运行 部分管道基于输入参数
如何运行代码:
- 将代码复制并粘贴到 python 文件中
- 在文件所在位置打开终端,然后输入
python code_file_name.py
。在 运行 代码之前,您需要编辑存储桶的名称(为您自己的名称)。
代码:
import argparse
# import apache beam PTransforms
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
def run(argv=None, save_main_session=True):
"""Main entry point."""
parser = argparse.ArgumentParser() # creates a parser
# output & input destinations are from a Google Bucket
parser.add_argument( # add first output command & destination
'--output_1',
dest='output_1',
default='gs://my-bucket/output_1',
help='Output file to write results to.')
parser.add_argument( # add second output command & destination
'--output_2',
dest='output_2',
default='gs://my-bucket/output_2',
help='Output file (2) to write results to.')
parser.add_argument( # add first input command & destination
'--input_1',
dest='input_1',
default='gs://my-bucket/input_1.txt',
help='First input file to process')
parser.add_argument( # add second input command & destination
'--input_2',
dest='input_2',
default='gs://my-bucket/input_2.txt',
help='Second input file to process')
parser.add_argument( # add conditional command & destination
'--my_condition',
dest='my_condition',
default=True,
help='Condition for pipeline')
# store our parser arguments in 'known_args'
known_args, pipeline_args = parser.parse_known_args(argv)
# pipeline arguments are below:
pipeline_args.extend([
# --runner=DataflowRunner', # you can run this with Dataflow, but it will take longer
'--runner=DirectRunner',
'--project=my-project',
'--staging_location=gs://my-bucket/staging',
'--temp_location=gs://my-bucket/temp',
'--job_name=demo-job',
])
# setting up pipeline options
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
# the pipeline
with beam.Pipeline(options=pipeline_options) as p:
if run_pipeline:
if known_args.my_condition:
pipeline_read_file = known_args.input_1
else:
pipeline_read_file = known_args.input_2
# PCollection containing contents of an input file, based on conditional
pcoll = (
p | beam.io.ReadFromText(pipeline_read_file)
)
# Write our PCollection to a Google Bucket
pcoll | "Write PCollection To Bucket" >> beam.io.WriteToText(known_args.output_1, num_shards=1)
# PCollection that will only be written based on an input condition
pcoll_break_cond = (
p | beam.Create([{"key": "value"}])
)
if known_args.my_condition:
pcoll_break_cond | "Write Break Condition PCollection To Bucket" >> beam.io.WriteToText(known_args.output_2, num_shards=1)
if __name__ == '__main__':
run_pipeline = True # This variable determines if the pipeline is run at all
run()
我对此进行了很多搜索,但没有任何有用的信息。
是否可以构建具有 if
条件的管道?在某些情况下,破坏管道?
我可以通过使用 with_outputs
并创建多个分支来部分实现。但是如果我想 运行 基于输入参数的不同管道,所有分支都被执行并且所有分支都创建输出文件,甚至一些分支有空输入。
我创建了一个包含各种条件的示例管道。
代码功能简介如下:
- 创建
parser
(输入)参数,主要是来自 Google 存储桶的输入和输出文件目标。一个参数将是一个布尔值。 - 创建管道参数,例如指定
--runner
和temp
&staging
位置。 - 设置管道选项。
- 管道中的内容仅在变量
run_pipeline
为True
时执行。 - 根据输入参数
my_condition
,确定管道中读取的文件。 - 以上文件写入到Google Bucket
- 创建一个新的
PCollection
,只有当输入条件my_condition
为True
时才输出。
总的来说,这条管道展示了:
- 构建具有
if
条件的管道 - 在某些情况下,管道损坏
- 运行 部分管道基于输入参数
如何运行代码:
- 将代码复制并粘贴到 python 文件中
- 在文件所在位置打开终端,然后输入
python code_file_name.py
。在 运行 代码之前,您需要编辑存储桶的名称(为您自己的名称)。
代码:
import argparse
# import apache beam PTransforms
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
def run(argv=None, save_main_session=True):
"""Main entry point."""
parser = argparse.ArgumentParser() # creates a parser
# output & input destinations are from a Google Bucket
parser.add_argument( # add first output command & destination
'--output_1',
dest='output_1',
default='gs://my-bucket/output_1',
help='Output file to write results to.')
parser.add_argument( # add second output command & destination
'--output_2',
dest='output_2',
default='gs://my-bucket/output_2',
help='Output file (2) to write results to.')
parser.add_argument( # add first input command & destination
'--input_1',
dest='input_1',
default='gs://my-bucket/input_1.txt',
help='First input file to process')
parser.add_argument( # add second input command & destination
'--input_2',
dest='input_2',
default='gs://my-bucket/input_2.txt',
help='Second input file to process')
parser.add_argument( # add conditional command & destination
'--my_condition',
dest='my_condition',
default=True,
help='Condition for pipeline')
# store our parser arguments in 'known_args'
known_args, pipeline_args = parser.parse_known_args(argv)
# pipeline arguments are below:
pipeline_args.extend([
# --runner=DataflowRunner', # you can run this with Dataflow, but it will take longer
'--runner=DirectRunner',
'--project=my-project',
'--staging_location=gs://my-bucket/staging',
'--temp_location=gs://my-bucket/temp',
'--job_name=demo-job',
])
# setting up pipeline options
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
# the pipeline
with beam.Pipeline(options=pipeline_options) as p:
if run_pipeline:
if known_args.my_condition:
pipeline_read_file = known_args.input_1
else:
pipeline_read_file = known_args.input_2
# PCollection containing contents of an input file, based on conditional
pcoll = (
p | beam.io.ReadFromText(pipeline_read_file)
)
# Write our PCollection to a Google Bucket
pcoll | "Write PCollection To Bucket" >> beam.io.WriteToText(known_args.output_1, num_shards=1)
# PCollection that will only be written based on an input condition
pcoll_break_cond = (
p | beam.Create([{"key": "value"}])
)
if known_args.my_condition:
pcoll_break_cond | "Write Break Condition PCollection To Bucket" >> beam.io.WriteToText(known_args.output_2, num_shards=1)
if __name__ == '__main__':
run_pipeline = True # This variable determines if the pipeline is run at all
run()