通过 Airflow 中的 PythonVirtualenvOperator 多次成功的数据流管道 运行
Successful Dataflow Pipeline being run multiple times via PythonVirtualenvOperator in Airflow
我正在 运行 正在使用 Apache Airflow 编排 Apache Beam 管道(使用 Google Dataflow 部署)。
DAG 文件如下所示:
from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.python_operator import PythonVirtualenvOperator
import custom_py_file #beam job in this file
default_args = {
'owner': 'name',
'depends_on_past': False,
'start_date': datetime(2016, 1, 1),
'email': ['email@gmail.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=1),
}
CONNECTION_ID = 'proj'
with DAG('dag_pipeline', schedule_interval='@once', template_searchpath=['/home/airflow/gcs/dags/'], max_active_runs=15, catchup=True, default_args=default_args) as dag:
lines = PythonVirtualenvOperator(
task_id='lines',
python_callable=custom_py_file.main, #this file has a function main() where the beam job is declared
requirements=['apache-beam[gcp]', 'pandas'],
python_version=3,
dag=dag
)
lines
beam流水线文件(custom_py_file.py
)如下:
def main():
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
import argparse
import time
class ETL(beam.DoFn):
def process(self, row):
#process data
def run(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument(
'--input',
dest='input',
default='gs://bucket/input/input.txt',
help='Input file to process.'
)
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_args.extend([
'--runner=DataflowRunner',
'--project=proj',
'--region=region',
'--staging_location=gs://bucket/staging/',
'--temp_location=gs://bucket/temp/',
'--job_name=name-{}'.format(time.strftime("%Y%m%d%h%M%s").lower()),
'--setup_file=/home/airflow/gcs/dags/setup.py',
'--disk_size_gb=350',
'--machine_type=n1-highmem-96',
'--num_workers=24',
'--autoscaling_algorithm=NONE'
])
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
with beam.Pipeline(options=pipeline_options) as p:
rows = (p | 'read rows' >> beam.io.ReadFromText(known_args.input))
etl = (rows | 'process data' >> beam.ParDo(ETL()))
p.run().wait_until_finish()
logging.getLogger().setLevel(logging.DEBUG)
run()
我正在使用 PythonVirtualenvOperator
,因为我无法在当前版本的气流(版本:1.10.2-composer)中使用 Python3 和 BashOperator
,我需要 Python3 到 运行 这个管道。
问题是,尽管 运行 成功,Airflow 还是提交了另一个数据流作业。请注意,这不是重试,因为日志显示所有 "one" 任务 运行。但是,Dataflow 日志显示它 运行 在已经 运行 成功一次后再次执行完全相同的作业。
这是怎么回事?成功的数据流作业是否没有输出 0 值?如果 运行 正确,我如何让它继续执行下一个任务?谢谢!
它不被视为重试并且在第一个作业结束后执行一个作业这一事实让我怀疑类似于 的事情。检查您的 Python 代码,我发现您同时调用了 with beam.Pipeline()
和 p.run()
:
with beam.Pipeline(options=pipeline_options) as p:
rows = (p | 'read rows' >> beam.io.ReadFromText(known_args.input))
etl = (rows | 'process data' >> beam.ParDo(ETL()))
p.run().wait_until_finish()
这将触发连续两次执行。您可以执行任一选项(但不能同时执行):
with beam.Pipeline(options=pipeline_options) as p:
rows = (p | 'read rows' >> beam.io.ReadFromText(known_args.input))
etl = (rows | 'process data' >> beam.ParDo(ETL()))
p = beam.Pipeline(options=pipeline_options)
rows = (p | 'read rows' >> beam.io.ReadFromText(known_args.input))
etl = (rows | 'process data' >> beam.ParDo(ETL()))
p.run().wait_until_finish()
我正在 运行 正在使用 Apache Airflow 编排 Apache Beam 管道(使用 Google Dataflow 部署)。
DAG 文件如下所示:
from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.python_operator import PythonVirtualenvOperator
import custom_py_file #beam job in this file
default_args = {
'owner': 'name',
'depends_on_past': False,
'start_date': datetime(2016, 1, 1),
'email': ['email@gmail.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=1),
}
CONNECTION_ID = 'proj'
with DAG('dag_pipeline', schedule_interval='@once', template_searchpath=['/home/airflow/gcs/dags/'], max_active_runs=15, catchup=True, default_args=default_args) as dag:
lines = PythonVirtualenvOperator(
task_id='lines',
python_callable=custom_py_file.main, #this file has a function main() where the beam job is declared
requirements=['apache-beam[gcp]', 'pandas'],
python_version=3,
dag=dag
)
lines
beam流水线文件(custom_py_file.py
)如下:
def main():
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
import argparse
import time
class ETL(beam.DoFn):
def process(self, row):
#process data
def run(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument(
'--input',
dest='input',
default='gs://bucket/input/input.txt',
help='Input file to process.'
)
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_args.extend([
'--runner=DataflowRunner',
'--project=proj',
'--region=region',
'--staging_location=gs://bucket/staging/',
'--temp_location=gs://bucket/temp/',
'--job_name=name-{}'.format(time.strftime("%Y%m%d%h%M%s").lower()),
'--setup_file=/home/airflow/gcs/dags/setup.py',
'--disk_size_gb=350',
'--machine_type=n1-highmem-96',
'--num_workers=24',
'--autoscaling_algorithm=NONE'
])
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
with beam.Pipeline(options=pipeline_options) as p:
rows = (p | 'read rows' >> beam.io.ReadFromText(known_args.input))
etl = (rows | 'process data' >> beam.ParDo(ETL()))
p.run().wait_until_finish()
logging.getLogger().setLevel(logging.DEBUG)
run()
我正在使用 PythonVirtualenvOperator
,因为我无法在当前版本的气流(版本:1.10.2-composer)中使用 Python3 和 BashOperator
,我需要 Python3 到 运行 这个管道。
问题是,尽管 运行 成功,Airflow 还是提交了另一个数据流作业。请注意,这不是重试,因为日志显示所有 "one" 任务 运行。但是,Dataflow 日志显示它 运行 在已经 运行 成功一次后再次执行完全相同的作业。
这是怎么回事?成功的数据流作业是否没有输出 0 值?如果 运行 正确,我如何让它继续执行下一个任务?谢谢!
它不被视为重试并且在第一个作业结束后执行一个作业这一事实让我怀疑类似于 with beam.Pipeline()
和 p.run()
:
with beam.Pipeline(options=pipeline_options) as p:
rows = (p | 'read rows' >> beam.io.ReadFromText(known_args.input))
etl = (rows | 'process data' >> beam.ParDo(ETL()))
p.run().wait_until_finish()
这将触发连续两次执行。您可以执行任一选项(但不能同时执行):
with beam.Pipeline(options=pipeline_options) as p:
rows = (p | 'read rows' >> beam.io.ReadFromText(known_args.input))
etl = (rows | 'process data' >> beam.ParDo(ETL()))
p = beam.Pipeline(options=pipeline_options)
rows = (p | 'read rows' >> beam.io.ReadFromText(known_args.input))
etl = (rows | 'process data' >> beam.ParDo(ETL()))
p.run().wait_until_finish()