尝试使用 DataProcSparkOperator 任务加载 DAG 的 AttributeError
AttributeError trying to load a DAG with DataProcSparkOperator tasks
我编写了一个 DAG 来在 DataProc 集群上执行多个 spark 任务。这个 DAG 过去没有任何改变就可以工作,但我不得不删除并重新安装 Airflow。现在,当启动网络服务器时,我收到以下错误:
AttributeError: 'DataProcSparkOperator' object has no attribute 'dataproc_spark_jars'
文档表明此对象确实具有此属性(我可以证明,因为它过去工作正常),我不确定我需要做什么才能让它再次工作。
这是其中一项任务:
run_spark_job = dpo.DataProcSparkOperator(
task_id = 'run_spark_job',
main_class = main_class,
dataproc_spark_jars = [main_jar],
arguments=['--prop-file', '{}/{}'.format(conf_dest, conf_name), '-d', '{}'.format(date_param)],
)
Pypi 上 Airflow 的当前实时版本似乎存在问题 - 在 Airflow 的 GitHub 上,最新版本的 dataproc_operators.py
已删除 dataproc_spark_jars
属性并将其替换为dataproc_jars
.
有点笨拙,但我将这个版本的 dataproc_operators.py
复制到我的本地副本,我的问题得到解决(当然是在我的 DAG 中重命名属性之后)
我编写了一个 DAG 来在 DataProc 集群上执行多个 spark 任务。这个 DAG 过去没有任何改变就可以工作,但我不得不删除并重新安装 Airflow。现在,当启动网络服务器时,我收到以下错误:
AttributeError: 'DataProcSparkOperator' object has no attribute 'dataproc_spark_jars'
文档表明此对象确实具有此属性(我可以证明,因为它过去工作正常),我不确定我需要做什么才能让它再次工作。
这是其中一项任务:
run_spark_job = dpo.DataProcSparkOperator(
task_id = 'run_spark_job',
main_class = main_class,
dataproc_spark_jars = [main_jar],
arguments=['--prop-file', '{}/{}'.format(conf_dest, conf_name), '-d', '{}'.format(date_param)],
)
Pypi 上 Airflow 的当前实时版本似乎存在问题 - 在 Airflow 的 GitHub 上,最新版本的 dataproc_operators.py
已删除 dataproc_spark_jars
属性并将其替换为dataproc_jars
.
有点笨拙,但我将这个版本的 dataproc_operators.py
复制到我的本地副本,我的问题得到解决(当然是在我的 DAG 中重命名属性之后)