将 Dagster 与 Django 集成

Integrating Dagster with Django

您好,我正在尝试将 Dagster 集成到正在进行的 Django 项目中。我有点难以为 Dagster 提供 Django 上下文(模型、应用程序等)。截至目前,我只是检查 dagit 是否存在于正在使用的应用程序的 init.py 中的 sys.argv[0] 中匕首.

<!-- language: python -->
## project/app/__init__.py
import sys

import django

if 'dagit-cli' in sys.argv[0]:
    print('dagit')
    django.setup()

谁能帮我设置一下?

据我所知,没有高级集成,但我发现了一些从 Django 视图启动管道的解决方法:

  • 通过从 python 代码调用包函数 execute_pipeline(<your_pipeline>) 同步地 运行 管道,例如在 View 中(即使它是synchronous/asynchronous 范式毫无意义)

  • 创建自定义管理命令并使用 django.core.management.call_command() 挂钩从 python 代码中调用它

我也会像 Timothé 所说的那样使用 Django 自定义管理命令在 Django 的上下文中执行函数或访问 ORM。

但是,如果您的用例需要直接访问 Django ORM,则需要执行以下几个步骤:

  1. 将 Django 项目添加到 sys.path,以便 Python 可以找到包。
  2. 设置 Django 环境。
import os
import sys

import django
from dagster import execute_pipeline, pipeline, solid


# Add the project to sys.path, so that Python can find packages
PROJECT_ROOT = os.path.join(os.path.dirname(__file__), 'demo')
sys.path.append(PROJECT_ROOT)

# Set up the Django environment
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'demo.settings')
django.setup()


from customers.models import Customer


@solid
def hello_django_orm(context):
    number_of_customers = Customer.objects.count()
    message = f'Found {number_of_customers} customers!'
    context.log.info(message)
    return number_of_customers


@pipeline
def hello_django_orm_pipeline():
    hello_django_orm()


if __name__ == '__main__':
    result = execute_pipeline(hello_django_orm_pipeline)
    assert result.success

查看下面的结果。

请查看完整示例here