Pytest 功能测试多处理任务队列服务

Pytest function testing multi-processing task queue service

我有一个任务队列处理服务,我正在尝试 运行 pytest 功能测试。当 运行 在 'production' 中使用它时,我从命令行启动它,例如python main.py.

我不知道如何从 pytest 启动这个任务服务来对其进行功能测试。如何在 pytest 中启动服务,以便我可以向其添加作业并查看作业是否得到处理并在完成后添加到数据库中?

def main():
    store = "jobs"
    worker_id = 1
    # Process tasks
    task_processing[store] = multiprocessing.Process(
            target=process_tasks, args=(store, worker_id)
        )
    nanopub_processing[store].start()


if __name__ == "__main__":
    main()

只要确保您正确访问 main 功能即可:

from main import main

def test_main():
    main()
    ...