无法让 Apache Airflow 使用 EMR 操作符写入 S3

Can't get Apache Airflow to write to S3 using EMR Operators

我正在使用 Airflow EMR Operators 创建一个 AWS EMR 集群,运行S3 中包含一个 Jar 文件,然后将输出写回 S3。它似乎能够 运行 使用来自 S3 的 Jar 文件的作业,但我无法将输出写入 S3。当 运行 将它作为 AWS EMR CLI Bash 命令使用时,我能够让它将输出写入 S3,但我需要使用 Airflow EMR 操作符来完成它。我在 Airflow 步骤配置和 Jar 文件的环境配置中都设置了 S3 输出目录,但仍然无法让操作员写入它。

这是我的 Airflow DAG 代码

from datetime import datetime, timedelta
import airflow
from airflow import DAG
from airflow.contrib.operators.emr_create_job_flow_operator import EmrCreateJobFlowOperator
from airflow.contrib.operators.emr_add_steps_operator import EmrAddStepsOperator
from airflow.contrib.operators.emr_terminate_job_flow_operator import EmrTerminateJobFlowOperator
from airflow.contrib.sensors.emr_step_sensor import EmrStepSensor
from airflow.hooks.S3_hook import S3Hook
from airflow.operators.s3_file_transform_operator import S3FileTransformOperator

DEFAULT_ARGS = {
    'owner': 'AIRFLOW_USER',
    'depends_on_past': False,
    'start_date':  datetime(2019, 9, 9),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False
}

RUN_STEPS = [
    {
        "Name": "run-custom-create-emr",
        "ActionOnFailure": "CONTINUE",
        "HadoopJarStep": {
            "Jar": "command-runner.jar",
            "Args": [
                "spark-submit", "--deploy-mode", "cluster", "--master", "yarn", "--conf",
                "spark.yarn.submit.waitAppCompletion=false", "--class", "CLASSPATH",
                "s3://INPUT_JAR_FILE",
                "s3://OUTPUT_DIR"
            ]
        }
    }
]

JOB_FLOW_OVERRIDES = {
    "Name": "JOB_NAME",
    "LogUri": "s3://LOG_DIR/",
    "ReleaseLabel": "emr-5.23.0",
    "Instances": {
        "Ec2KeyName": "KP_USER_NAME",
        "Ec2SubnetId": "SUBNET",
        "EmrManagedMasterSecurityGroup": "SG-ID",
        "EmrManagedSlaveSecurityGroup": "SG-ID",
        "InstanceGroups": [
            {
                "Name": "Master nodes",
                "Market": "ON_DEMAND",
                "InstanceRole": "MASTER",
                "InstanceType": "m4.large",
                "InstanceCount": 1
            },
            {
                "Name": "Slave nodes",
                "Market": "ON_DEMAND",
                "InstanceRole": "CORE",
                "InstanceType": "m4.large",
                "InstanceCount": 1
            }
        ],
        "TerminationProtected": True,
        "KeepJobFlowAliveWhenNoSteps": True,
    },
    "Applications": [
        {
            "Name": "Spark"
        },
        {
            "Name": "Ganglia"
        },
        {
            "Name": "Hadoop"
        },
        {
            "Name": "Hive"
        }
    ],
    "JobFlowRole": "ROLE_NAME",
    "ServiceRole": "ROLE_NAME",
    "ScaleDownBehavior": "TERMINATE_AT_TASK_COMPLETION",
    "EbsRootVolumeSize": 10,
    "Tags": [
        {
            "Key": "Country",
            "Value": "us"
        },
        {
            "Key": "Environment",
            "Value": "dev"
        }
    ]
}

dag = DAG(
    'AWS-EMR-JOB',
    default_args=DEFAULT_ARGS,
    dagrun_timeout=timedelta(hours=2),
    schedule_interval=None
)

cluster_creator = EmrCreateJobFlowOperator(
    task_id='create_job_flow',
    job_flow_overrides=JOB_FLOW_OVERRIDES,
    aws_conn_id='aws_default',
    emr_conn_id='emr_connection_CustomCreate',
    dag=dag
)

step_adder = EmrAddStepsOperator(
    task_id='add_steps',
    job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
    aws_conn_id='aws_default',
    steps=RUN_STEPS,
    dag=dag
)

step_checker = EmrStepSensor(
    task_id='watch_step',
    job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
    step_id="{{ task_instance.xcom_pull('add_steps', key='return_value')[0] }}",
    aws_conn_id='aws_default',
    dag=dag
)

cluster_remover = EmrTerminateJobFlowOperator(
    task_id='remove_cluster',
    job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
    aws_conn_id='aws_default',
    dag=dag
)

cluster_creator.set_downstream(step_adder)
step_adder.set_downstream(step_checker)
step_checker.set_downstream(cluster_remover)

有人知道如何解决这个问题吗?任何帮助将不胜感激。

我相信我刚刚解决了我的问题。在真正深入研究所有本地 Airflow 日志和 S3 EMR 日志后,我发现了一个 Hadoop 内存异常,所以我将核心数量增加到 运行 EMR,它现在似乎可以工作了。