尝试从本地 Airflow 运行 DataProcSparkOperator 任务时出现 HttpError 400
HttpError 400 when trying to run DataProcSparkOperator task from a local Airflow
我正在测试我曾经在 Google Composer 上 运行 在本地安装的 Airflow 上没有错误的 DAG。 DAG 启动 Google Dataproc 集群,运行 Spark 作业(位于 GS 存储桶上的 JAR 文件),然后关闭集群。
DataProcSparkOperator 任务每次都立即失败并出现以下错误:
googleapiclient.errors.HttpError: <HttpError 400 when requesting https://dataproc.googleapis.com/v1beta2/projects//regions/global/jobs:submit?alt=json returned "Invalid resource field value in the request.">
看起来好像 URI 是 incorrect/incomplete,但我不确定是什么原因造成的。下面是我的 DAG 的主要部分。所有其他任务都没有错误地执行,唯一的区别是 DAG 在 Composer 上不再是 运行:
default_dag_args = {
'start_date': yesterday,
'email': models.Variable.get('email'),
'email_on_failure': True,
'email_on_retry': True,
'retries': 0,
'retry_delay': dt.timedelta(seconds=30),
'project_id': models.Variable.get('gcp_project'),
'cluster_name': 'susi-bsm-cluster-{{ ds_nodash }}'
}
def slack():
'''Posts to Slack if the Spark job fails'''
text = ':x: The DAG *{}* broke and I am not smart enough to fix it. Check the StackDriver and DataProc logs.'.format(DAG_NAME)
s.post_slack(SLACK_URI, text)
with DAG(DAG_NAME, schedule_interval='@once',
default_args=default_dag_args) as dag:
# pylint: disable=no-value-for-parameter
delete_existing_parquet = bo.BashOperator(
task_id = 'delete_existing_parquet',
bash_command = 'gsutil rm -r {}/susi/bsm/bsm.parquet'.format(GCS_BUCKET)
)
create_dataproc_cluster = dpo.DataprocClusterCreateOperator(
task_id = 'create_dataproc_cluster',
num_workers = num_workers_override or models.Variable.get('default_dataproc_workers'),
zone = models.Variable.get('gce_zone'),
init_actions_uris = ['gs://cjones-composer-test/susi/susi-bsm-dataproc-init.sh'],
trigger_rule = trigger_rule.TriggerRule.ALL_DONE
)
run_spark_job = dpo.DataProcSparkOperator(
task_id = 'run_spark_job',
main_class = MAIN_CLASS,
dataproc_spark_jars = [MAIN_JAR],
arguments=['{}/susi.conf'.format(CONF_DEST), DATE_CONST]
)
notify_on_fail = po.PythonOperator(
task_id = 'output_to_slack',
python_callable = slack,
trigger_rule = trigger_rule.TriggerRule.ONE_FAILED
)
delete_dataproc_cluster = dpo.DataprocClusterDeleteOperator(
task_id = 'delete_dataproc_cluster',
trigger_rule = trigger_rule.TriggerRule.ALL_DONE
)
delete_existing_parquet >> create_dataproc_cluster >> run_spark_job >> delete_dataproc_cluster >> notify_on_fail
如有任何帮助,我们将不胜感激!
与DataprocClusterCreateOperator
不同,DataProcSparkOperator
不将project_id
作为参数。它从 Airflow 连接中获取(如果不指定 gcp_conn_id
参数,则默认为 google_cloud_default
)。您必须配置您的连接。
在 Composer 中 运行 DAG 时看不到这个的原因是 Composer configures google_cloud_default
连接。
我正在测试我曾经在 Google Composer 上 运行 在本地安装的 Airflow 上没有错误的 DAG。 DAG 启动 Google Dataproc 集群,运行 Spark 作业(位于 GS 存储桶上的 JAR 文件),然后关闭集群。
DataProcSparkOperator 任务每次都立即失败并出现以下错误:
googleapiclient.errors.HttpError: <HttpError 400 when requesting https://dataproc.googleapis.com/v1beta2/projects//regions/global/jobs:submit?alt=json returned "Invalid resource field value in the request.">
看起来好像 URI 是 incorrect/incomplete,但我不确定是什么原因造成的。下面是我的 DAG 的主要部分。所有其他任务都没有错误地执行,唯一的区别是 DAG 在 Composer 上不再是 运行:
default_dag_args = {
'start_date': yesterday,
'email': models.Variable.get('email'),
'email_on_failure': True,
'email_on_retry': True,
'retries': 0,
'retry_delay': dt.timedelta(seconds=30),
'project_id': models.Variable.get('gcp_project'),
'cluster_name': 'susi-bsm-cluster-{{ ds_nodash }}'
}
def slack():
'''Posts to Slack if the Spark job fails'''
text = ':x: The DAG *{}* broke and I am not smart enough to fix it. Check the StackDriver and DataProc logs.'.format(DAG_NAME)
s.post_slack(SLACK_URI, text)
with DAG(DAG_NAME, schedule_interval='@once',
default_args=default_dag_args) as dag:
# pylint: disable=no-value-for-parameter
delete_existing_parquet = bo.BashOperator(
task_id = 'delete_existing_parquet',
bash_command = 'gsutil rm -r {}/susi/bsm/bsm.parquet'.format(GCS_BUCKET)
)
create_dataproc_cluster = dpo.DataprocClusterCreateOperator(
task_id = 'create_dataproc_cluster',
num_workers = num_workers_override or models.Variable.get('default_dataproc_workers'),
zone = models.Variable.get('gce_zone'),
init_actions_uris = ['gs://cjones-composer-test/susi/susi-bsm-dataproc-init.sh'],
trigger_rule = trigger_rule.TriggerRule.ALL_DONE
)
run_spark_job = dpo.DataProcSparkOperator(
task_id = 'run_spark_job',
main_class = MAIN_CLASS,
dataproc_spark_jars = [MAIN_JAR],
arguments=['{}/susi.conf'.format(CONF_DEST), DATE_CONST]
)
notify_on_fail = po.PythonOperator(
task_id = 'output_to_slack',
python_callable = slack,
trigger_rule = trigger_rule.TriggerRule.ONE_FAILED
)
delete_dataproc_cluster = dpo.DataprocClusterDeleteOperator(
task_id = 'delete_dataproc_cluster',
trigger_rule = trigger_rule.TriggerRule.ALL_DONE
)
delete_existing_parquet >> create_dataproc_cluster >> run_spark_job >> delete_dataproc_cluster >> notify_on_fail
如有任何帮助,我们将不胜感激!
与DataprocClusterCreateOperator
不同,DataProcSparkOperator
不将project_id
作为参数。它从 Airflow 连接中获取(如果不指定 gcp_conn_id
参数,则默认为 google_cloud_default
)。您必须配置您的连接。
在 Composer 中 运行 DAG 时看不到这个的原因是 Composer configures google_cloud_default
连接。