用户启动的后台进程数量有限
limited number of user-initiated background processes
我需要允许用户提交对非常非常大的作业的请求。我们说的是 100 GB 的内存和 20 小时的计算时间。这让我们公司花了很多钱,所以规定任何时候只能运行ning 2个职位,已经运行ning的2个职位要求新职位将被拒绝(并且用户通知服务器正忙)。
我目前的解决方案使用了 concurrent.futures 的 Executor,并且需要将 Apache 服务器设置为 运行 只有一个进程,从而降低了响应速度(当前用户数非常少,所以暂时可以) .
如果可能的话,我想为此使用 Celery,但我没有在文档中看到任何方法来完成此特定设置。
我如何在 Django 应用程序中 运行 在后台最多执行有限数量的作业,并在作业因服务器繁忙而被拒绝时通知用户?
首先你需要限制你的worker(docs)的并发:
celery -A proj worker --loglevel=INFO --concurrency=2 -n <worker_name>
这将有助于确保您的活动任务不超过 2 个,即使您的代码中存在错误也是如此。
现在您有 2 种方法来实现任务编号验证:
您可以使用 inspect 获取活动和计划任务的数量:
from celery import current_app
def start_job():
inspect = current_app.control.inspect()
active_tasks = inspect.active() or {}
scheduled_tasks = inspect.scheduled() or {}
worker_key = 'celery@%s' % <worker_name>
worker_tasks = active_tasks.get(worker_key, []) + scheduled_tasks.get(worker_key, [])
if len(worker_tasks) >= 2:
raise MyCustomException('It is impossible to start more than 2 tasks.')
else:
my_task.delay()
您可以在数据库中存储当前正在执行的任务数,并根据它验证任务执行。
如果您想扩展您的功能,第二种方法可能更好 - 引入高级用户或不允许一个用户执行 2 个请求。
针对这种特殊情况,我有两种解决方案,一种是 celery 开箱即用的解决方案,另一种是您自己实现的。
- 你可以用芹菜工人做这样的事情。特别是,您只创建两个并发=1的工作进程(或者好吧,一个并发=2,但那将是线程,而不是不同的进程),这样,只有两个作业可以异步完成。现在你需要一种方法来在两个工作都被占用时引发异常,然后你使用 inspect, to count the number of active tasks and throw exceptions if required. For implementation, you can checkout this SO post.
您可能还对rate limits感兴趣。
您可以使用所选的锁定解决方案自行完成所有操作。特别是,确保只有两个进程是 运行 redis(和 redis-py)的一个很好的实现就像下面这样简单。 (考虑到你知道redis,因为你知道celery)
from redis import StrictRedis
redis = StrictRedis('localhost', '6379')
locks = ['compute:lock1', 'compute:lock2']
for key in locks:
lock = redis.lock(key, blocking_timeout=5)
acquired = lock.acquire()
if acquired:
do_huge_computation()
lock.release()
break
print("Gonna try next possible slot")
if not acquired:
raise SystemLimitsReached("Already at max capacity !")
这样可以确保系统中只能存在两个 运行 进程。第三个进程将在 lock.acquire()
行中阻塞 blocking_timeout 秒,如果锁定成功,则 acquired
为 True,否则为 False 并且你会告诉你的用户等待!
我过去某个时候有过同样的需求,我最终编写的代码类似于上面的解决方案。特别是
- 这可能具有最少的竞争条件
- 易于阅读
- 不依赖于系统管理员,在负载下突然将工作人员的并发性增加一倍并炸毁整个系统。
- 您还可以实施每个用户的限制,这意味着每个用户可以同时拥有 2 个 运行 作业,仅通过将锁定密钥从 compute:lock1 更改为 compute:userId:lock1 并相应地更改 lock2。你不能用香草芹菜做这个。
第一
您需要 的第一部分。据他说,“你只创建了两个并发=1的工作进程”。
第二
设置time out for the task waiting in the queue, which is set CELERY_EVENT_QUEUE_TTL and the queue length limit according to 。
因此,当两个工作运行个作业时,队列中的任务等待10秒或任何你喜欢的时间,任务就会超时。或者如果队列已经完成,新到达的任务将被丢弃。
第三
你需要额外的东西来处理通知 "users when jobs are rejected because the server is busy"。
Dead Letter Exchanges 就是您所需要的。每次任务因队列长度限制或消息超时而失败。 "Messages will be dropped or dead-lettered from the front of the queue to make room for new messages once the limit is reached."
你可以设置"x-dead-letter-exchange"路由到另一个队列,一旦这个队列收到死信消息,你可以向用户发送通知消息。
我需要允许用户提交对非常非常大的作业的请求。我们说的是 100 GB 的内存和 20 小时的计算时间。这让我们公司花了很多钱,所以规定任何时候只能运行ning 2个职位,已经运行ning的2个职位要求新职位将被拒绝(并且用户通知服务器正忙)。
我目前的解决方案使用了 concurrent.futures 的 Executor,并且需要将 Apache 服务器设置为 运行 只有一个进程,从而降低了响应速度(当前用户数非常少,所以暂时可以) .
如果可能的话,我想为此使用 Celery,但我没有在文档中看到任何方法来完成此特定设置。
我如何在 Django 应用程序中 运行 在后台最多执行有限数量的作业,并在作业因服务器繁忙而被拒绝时通知用户?
首先你需要限制你的worker(docs)的并发:
celery -A proj worker --loglevel=INFO --concurrency=2 -n <worker_name>
这将有助于确保您的活动任务不超过 2 个,即使您的代码中存在错误也是如此。
现在您有 2 种方法来实现任务编号验证:
您可以使用 inspect 获取活动和计划任务的数量:
from celery import current_app def start_job(): inspect = current_app.control.inspect() active_tasks = inspect.active() or {} scheduled_tasks = inspect.scheduled() or {} worker_key = 'celery@%s' % <worker_name> worker_tasks = active_tasks.get(worker_key, []) + scheduled_tasks.get(worker_key, []) if len(worker_tasks) >= 2: raise MyCustomException('It is impossible to start more than 2 tasks.') else: my_task.delay()
您可以在数据库中存储当前正在执行的任务数,并根据它验证任务执行。
如果您想扩展您的功能,第二种方法可能更好 - 引入高级用户或不允许一个用户执行 2 个请求。
针对这种特殊情况,我有两种解决方案,一种是 celery 开箱即用的解决方案,另一种是您自己实现的。
- 你可以用芹菜工人做这样的事情。特别是,您只创建两个并发=1的工作进程(或者好吧,一个并发=2,但那将是线程,而不是不同的进程),这样,只有两个作业可以异步完成。现在你需要一种方法来在两个工作都被占用时引发异常,然后你使用 inspect, to count the number of active tasks and throw exceptions if required. For implementation, you can checkout this SO post.
您可能还对rate limits感兴趣。
您可以使用所选的锁定解决方案自行完成所有操作。特别是,确保只有两个进程是 运行 redis(和 redis-py)的一个很好的实现就像下面这样简单。 (考虑到你知道redis,因为你知道celery)
from redis import StrictRedis redis = StrictRedis('localhost', '6379') locks = ['compute:lock1', 'compute:lock2'] for key in locks: lock = redis.lock(key, blocking_timeout=5) acquired = lock.acquire() if acquired: do_huge_computation() lock.release() break print("Gonna try next possible slot") if not acquired: raise SystemLimitsReached("Already at max capacity !")
这样可以确保系统中只能存在两个 运行 进程。第三个进程将在 lock.acquire()
行中阻塞 blocking_timeout 秒,如果锁定成功,则 acquired
为 True,否则为 False 并且你会告诉你的用户等待!
我过去某个时候有过同样的需求,我最终编写的代码类似于上面的解决方案。特别是
- 这可能具有最少的竞争条件
- 易于阅读
- 不依赖于系统管理员,在负载下突然将工作人员的并发性增加一倍并炸毁整个系统。
- 您还可以实施每个用户的限制,这意味着每个用户可以同时拥有 2 个 运行 作业,仅通过将锁定密钥从 compute:lock1 更改为 compute:userId:lock1 并相应地更改 lock2。你不能用香草芹菜做这个。
第一
您需要
第二
设置time out for the task waiting in the queue, which is set CELERY_EVENT_QUEUE_TTL and the queue length limit according to
因此,当两个工作运行个作业时,队列中的任务等待10秒或任何你喜欢的时间,任务就会超时。或者如果队列已经完成,新到达的任务将被丢弃。
第三
你需要额外的东西来处理通知 "users when jobs are rejected because the server is busy"。
Dead Letter Exchanges 就是您所需要的。每次任务因队列长度限制或消息超时而失败。 "Messages will be dropped or dead-lettered from the front of the queue to make room for new messages once the limit is reached."
你可以设置"x-dead-letter-exchange"路由到另一个队列,一旦这个队列收到死信消息,你可以向用户发送通知消息。