用户启动的后台进程数量有限

limited number of user-initiated background processes

我需要允许用户提交对非常非常大的作业的请求。我们说的是 100 GB 的内存和 20 小时的计算时间。这让我们公司花了很多钱,所以规定任何时候只能运行ning 2个职位,已经运行ning的2个职位要求新职位将被拒绝(并且用户通知服务器正忙)。

我目前的解决方案使用了 concurrent.futures 的 Executor,并且需要将 Apache 服务器设置为 运行 只有一个进程,从而降低了响应速度(当前用户数非常少,所以暂时可以) .

如果可能的话,我想为此使用 Celery,但我没有在文档中看到任何方法来完成此特定设置。

我如何在 Django 应用程序中 运行 在后台最多执行有限数量的作业,并在作业因服务器繁忙而被拒绝时通知用户?

首先你需要限制你的worker(docs)的并发:

celery -A proj worker --loglevel=INFO --concurrency=2 -n <worker_name>

这将有助于确保您的活动任务不超过 2 个,即使您的代码中存在错误也是如此。

现在您有 2 种方法来实现任务编号验证:

  1. 您可以使用 inspect 获取活动和计划任务的数量:

     from celery import current_app
    
     def start_job():
          inspect = current_app.control.inspect()
          active_tasks = inspect.active() or {}
          scheduled_tasks = inspect.scheduled() or {}
          worker_key = 'celery@%s' % <worker_name>
          worker_tasks = active_tasks.get(worker_key, []) + scheduled_tasks.get(worker_key, [])
          if len(worker_tasks) >= 2:
              raise MyCustomException('It is impossible to start more than 2 tasks.') 
          else:
              my_task.delay()
    
  2. 您可以在数据库中存储当前正在执行的任务数,并根据它验证任务执行。

如果您想扩展您的功能,第二种方法可能更好 - 引入高级用户或不允许一个用户执行 2 个请求。

针对这种特殊情况,我有两种解决方案,一种是 celery 开箱即用的解决方案,另一种是您自己实现的。

  1. 你可以用芹菜工人做这样的事情。特别是,您只创建两个并发=1的工作进程(或者好吧,一个并发=2,但那将是线程,而不是不同的进程),这样,只有两个作业可以异步完成。现在你需要一种方法来在两个工作都被占用时引发异常,然后你使用 inspect, to count the number of active tasks and throw exceptions if required. For implementation, you can checkout this SO post.

您可能还对rate limits感兴趣。

  1. 您可以使用所选的锁定解决方案自行完成所有操作。特别是,确保只有两个进程是 运行 redis(和 redis-py)的一个很好的实现就像下面这样简单。 (考虑到你知道redis,因为你知道celery)

    from redis import StrictRedis
    
    redis = StrictRedis('localhost', '6379')
    locks = ['compute:lock1', 'compute:lock2']
    for key in locks:
        lock = redis.lock(key, blocking_timeout=5)
        acquired = lock.acquire()
        if acquired:
            do_huge_computation()
            lock.release()
            break
        print("Gonna try next possible slot")
    
    if not acquired:
        raise SystemLimitsReached("Already at max capacity !")
    

这样可以确保系统中只能存在两个 运行 进程。第三个进程将在 lock.acquire() 行中阻塞 blocking_timeout 秒,如果锁定成功,则 acquired 为 True,否则为 False 并且你会告诉你的用户等待!

我过去某个时候有过同样的需求,我最终编写的代码类似于上面的解决方案。特别是

  1. 这可能具有最少的竞争条件
  2. 易于阅读
  3. 不依赖于系统管理员,在负载下突然将工作人员的并发性增加一倍并炸毁整个系统。
  4. 您还可以实施每个用户的限制,这意味着每个用户可以同时拥有 2 个 运行 作业,仅通过将锁定密钥从 compute:lock1 更改为 compute:userId:lock1 并相应地更改 lock2。你不能用香草芹菜做这个。

第一

您需要 的第一部分。据他说,“你只创建了两个并发=1的工作进程”。

第二

设置time out for the task waiting in the queue, which is set CELERY_EVENT_QUEUE_TTL and the queue length limit according to

因此,当两个工作运行个作业时,队列中的任务等待10秒或任何你喜欢的时间,任务就会超时。或者如果队列已经完成,新到达的任务将被丢弃。

第三

你需要额外的东西来处理通知 "users when jobs are rejected because the server is busy"。

Dead Letter Exchanges 就是您所需要的。每次任务因队列长度限制或消息超时而失败。 "Messages will be dropped or dead-lettered from the front of the queue to make room for new messages once the limit is reached."

你可以设置"x-dead-letter-exchange"路由到另一个队列,一旦这个队列收到死信消息,你可以向用户发送通知消息。