尝试使用 DataProcSparkOperator 任务加载 DAG 的 AttributeError

AttributeError trying to load a DAG with DataProcSparkOperator tasks

我编写了一个 DAG 来在 DataProc 集群上执行多个 spark 任务。这个 DAG 过去没有任何改变就可以工作,但我不得不删除并重新安装 Airflow。现在,当启动网络服务器时,我收到以下错误:

AttributeError: 'DataProcSparkOperator' object has no attribute 'dataproc_spark_jars'

文档表明此对象确实具有此属性(我可以证明,因为它过去工作正常),我不确定我需要做什么才能让它再次工作。

这是其中一项任务:

run_spark_job = dpo.DataProcSparkOperator(
            task_id = 'run_spark_job',
            main_class = main_class,
            dataproc_spark_jars = [main_jar],
            arguments=['--prop-file', '{}/{}'.format(conf_dest, conf_name), '-d', '{}'.format(date_param)],
        )

Pypi 上 Airflow 的当前实时版本似乎存在问题 - 在 Airflow 的 GitHub 上,最新版本的 dataproc_operators.py 已删除 dataproc_spark_jars 属性并将其替换为dataproc_jars.

有点笨拙,但我将这个版本的 dataproc_operators.py 复制到我的本地副本,我的问题得到解决(当然是在我的 DAG 中重命名属性之后)