如何在 Databricks 笔记本中使用 Airflow 变量?
How do I use an Airflow variable inside a Databricks notebook?
我有一个从 Airflow DAG 调用的 Databricks PySpark 笔记本。
我通过转到 Admin - Variables 并添加了一个键值对在 Airflow 中创建了一个变量。
我找不到在 Databricks 中使用该 Airflow 变量的方法。
编辑以添加我的代码示例。
notebook_task = {
'notebook_path': '/Users/email@exaple.com/myDAG',
'base_parameters': {
"token": token
}
}
和此处定义的运算符
opr_submit_run = DatabricksSubmitRunOperator(
task_id='run_notebook',
existing_cluster_id='xxxxx',
run_name='test',
databricks_conn_id='databricks_xxx',
notebook_task=notebook_task
)
最终的工作是使用 base_parameters 而不是 notebook_parans,可以在此处找到 https://docs.databricks.com/dev-tools/api/latest/jobs.html
并使用
从数据块访问它
my_param = dbutils.widgets.get("token")
如果你将它设置为笔记本调用的参数(参数在notebook_task
内),那么你需要使用the dbutils.widgets.get function,在笔记本的开头写这样的东西:
my_param = dbutils.widgets.get("key")
扩展 Alex 提供的答案,因为这个问题是在执行数据块笔记本的 Apache-Airflow 的上下文中提出的。
DatabricksRunNowOperator
(由 databricks provider 提供)有 notebook_params
,这是一个从键到值的字典,用于笔记本任务,例如"notebook_params": {"name": "john doe", "age": "35"}
。地图被传递到笔记本,可以通过
dbutils.widgets.get
函数。正如 Alex 所解释的那样,您可以通过以下方式访问数据块笔记本中的值:
my_param = dbutils.widgets.get("key")
示例用法为:
spark_jar_task = DatabricksSubmitRunOperator(
task_id='spark_jar_task',
new_cluster=new_cluster,
notebook_params={"name": "john doe", "age": "35"},
spark_jar_task={'main_class_name': 'com.example.ProcessData'},
libraries=[{'jar': 'dbfs:/lib/etl-0.1.jar'}],
)
现在的问题是如何从 Airflow 变量传递值而不是静态值。为此,我们需要 notebook_params
成为模板化字段,以便 Jinja 引擎将模板化该值。问题是 notebook_params
没有在 template_fields 中列出
为了克服这个问题,我们可以创建一个自定义版本的运算符:
class MyDatabricksRunNowOperator(DatabricksRunNowOperator):
template_fields = DatabricksRunNowOperator.template_fields + ('notebook_params',)
然后我们可以使用 macro {{ var.value.my_var }}
它将在 运行 时间内被模板化为:
spark_jar_task = MyDatabricksSubmitRunOperator(
task_id='spark_jar_task',
new_cluster=new_cluster,
notebook_params={"var_value": {{ var.value.my_var }} },
spark_jar_task={'main_class_name': 'com.example.ProcessData'},
libraries=[{'jar': 'dbfs:/lib/etl-0.1.jar'}],
)
运算符将获取my_var
变量的值并将其传递给您的笔记本。
我有一个从 Airflow DAG 调用的 Databricks PySpark 笔记本。 我通过转到 Admin - Variables 并添加了一个键值对在 Airflow 中创建了一个变量。
我找不到在 Databricks 中使用该 Airflow 变量的方法。
编辑以添加我的代码示例。
notebook_task = {
'notebook_path': '/Users/email@exaple.com/myDAG',
'base_parameters': {
"token": token
}
}
和此处定义的运算符
opr_submit_run = DatabricksSubmitRunOperator(
task_id='run_notebook',
existing_cluster_id='xxxxx',
run_name='test',
databricks_conn_id='databricks_xxx',
notebook_task=notebook_task
)
最终的工作是使用 base_parameters 而不是 notebook_parans,可以在此处找到 https://docs.databricks.com/dev-tools/api/latest/jobs.html
并使用
从数据块访问它my_param = dbutils.widgets.get("token")
如果你将它设置为笔记本调用的参数(参数在notebook_task
内),那么你需要使用the dbutils.widgets.get function,在笔记本的开头写这样的东西:
my_param = dbutils.widgets.get("key")
扩展 Alex 提供的答案,因为这个问题是在执行数据块笔记本的 Apache-Airflow 的上下文中提出的。
DatabricksRunNowOperator
(由 databricks provider 提供)有 notebook_params
,这是一个从键到值的字典,用于笔记本任务,例如"notebook_params": {"name": "john doe", "age": "35"}
。地图被传递到笔记本,可以通过
dbutils.widgets.get
函数。正如 Alex 所解释的那样,您可以通过以下方式访问数据块笔记本中的值:
my_param = dbutils.widgets.get("key")
示例用法为:
spark_jar_task = DatabricksSubmitRunOperator(
task_id='spark_jar_task',
new_cluster=new_cluster,
notebook_params={"name": "john doe", "age": "35"},
spark_jar_task={'main_class_name': 'com.example.ProcessData'},
libraries=[{'jar': 'dbfs:/lib/etl-0.1.jar'}],
)
现在的问题是如何从 Airflow 变量传递值而不是静态值。为此,我们需要 notebook_params
成为模板化字段,以便 Jinja 引擎将模板化该值。问题是 notebook_params
没有在 template_fields 中列出
为了克服这个问题,我们可以创建一个自定义版本的运算符:
class MyDatabricksRunNowOperator(DatabricksRunNowOperator):
template_fields = DatabricksRunNowOperator.template_fields + ('notebook_params',)
然后我们可以使用 macro {{ var.value.my_var }}
它将在 运行 时间内被模板化为:
spark_jar_task = MyDatabricksSubmitRunOperator(
task_id='spark_jar_task',
new_cluster=new_cluster,
notebook_params={"var_value": {{ var.value.my_var }} },
spark_jar_task={'main_class_name': 'com.example.ProcessData'},
libraries=[{'jar': 'dbfs:/lib/etl-0.1.jar'}],
)
运算符将获取my_var
变量的值并将其传递给您的笔记本。