通过 Airflow 中的 PythonVirtualenvOperator 多次成功的数据流管道 运行

Successful Dataflow Pipeline being run multiple times via PythonVirtualenvOperator in Airflow

我正在 运行 正在使用 Apache Airflow 编排 Apache Beam 管道(使用 Google Dataflow 部署)。

DAG 文件如下所示:

from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.python_operator import PythonVirtualenvOperator

import custom_py_file #beam job in this file 


default_args = {
    'owner': 'name',
    'depends_on_past': False,
    'start_date': datetime(2016, 1, 1),
    'email': ['email@gmail.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=1),
}

CONNECTION_ID = 'proj'

with DAG('dag_pipeline', schedule_interval='@once', template_searchpath=['/home/airflow/gcs/dags/'], max_active_runs=15, catchup=True, default_args=default_args) as dag:


    lines = PythonVirtualenvOperator(
        task_id='lines',
        python_callable=custom_py_file.main, #this file has a function main() where the beam job is declared 
        requirements=['apache-beam[gcp]', 'pandas'],
        python_version=3,
        dag=dag
    )

lines

beam流水线文件(custom_py_file.py)如下:

def main():
    import apache_beam as beam
    from apache_beam.options.pipeline_options import PipelineOptions
    from apache_beam.options.pipeline_options import SetupOptions
    import argparse
    import time


    class ETL(beam.DoFn):
        def process(self, row):
            #process data 

    def run(argv=None):
        parser = argparse.ArgumentParser()
        parser.add_argument(
            '--input',
            dest='input',
            default='gs://bucket/input/input.txt',
            help='Input file to process.'
            )
        known_args, pipeline_args = parser.parse_known_args(argv)
        pipeline_args.extend([
              '--runner=DataflowRunner',
              '--project=proj',
              '--region=region',
              '--staging_location=gs://bucket/staging/',
              '--temp_location=gs://bucket/temp/',
              '--job_name=name-{}'.format(time.strftime("%Y%m%d%h%M%s").lower()),
              '--setup_file=/home/airflow/gcs/dags/setup.py',
              '--disk_size_gb=350',
              '--machine_type=n1-highmem-96',
              '--num_workers=24',
              '--autoscaling_algorithm=NONE'
              ]) 

        pipeline_options = PipelineOptions(pipeline_args)
        pipeline_options.view_as(SetupOptions).save_main_session = True

        with beam.Pipeline(options=pipeline_options) as p:
            rows = (p | 'read rows' >> beam.io.ReadFromText(known_args.input))
            etl = (rows | 'process data' >> beam.ParDo(ETL()))

        p.run().wait_until_finish()

    logging.getLogger().setLevel(logging.DEBUG)
    run()

我正在使用 PythonVirtualenvOperator,因为我无法在当前版本的气流(版本:1.10.2-composer)中使用 Python3 和 BashOperator,我需要 Python3 到 运行 这个管道。

问题是,尽管 运行 成功,Airflow 还是提交了另一个数据流作业。请注意,这不是重试,因为日志显示所有 "one" 任务 运行。但是,Dataflow 日志显示它 运行 在已经 运行 成功一次后再次执行完全相同的作业。

这是怎么回事?成功的数据流作业是否没有输出 0 值?如果 运行 正确,我如何让它继续执行下一个任务?谢谢!

它不被视为重试并且在第一个作业结束后执行一个作业这一事实让我怀疑类似于 的事情。检查您的 Python 代码,我发现您同时调用了 with beam.Pipeline()p.run():

with beam.Pipeline(options=pipeline_options) as p:
    rows = (p | 'read rows' >> beam.io.ReadFromText(known_args.input))
    etl = (rows | 'process data' >> beam.ParDo(ETL()))

p.run().wait_until_finish()

这将触发连续两次执行。您可以执行任一选项(但不能同时执行):

with beam.Pipeline(options=pipeline_options) as p:
    rows = (p | 'read rows' >> beam.io.ReadFromText(known_args.input))
    etl = (rows | 'process data' >> beam.ParDo(ETL()))

p = beam.Pipeline(options=pipeline_options)

rows = (p | 'read rows' >> beam.io.ReadFromText(known_args.input))
etl = (rows | 'process data' >> beam.ParDo(ETL()))

p.run().wait_until_finish()