Python: 使用 celery 在多个服务器上处理参数列表
Python: Processing list of arguments on several servers with celery
我正在尝试使用 Celery 处理输入列表。我只想处理每个输入一次。问题是我的服务器都是超级计算机集群的一部分。我可以向每个服务器发送一个命令来启动一个进程。一旦该服务器被安排为我的用户名执行工作(在未来的某个随机时间发生),它将启动该过程(因此服务器的数量 运行ning 在任何时候都是不确定的)。我希望所有为我的用户名执行工作的服务器共享可用工作,直到完成所有必需的工作。
不过,我对如何准确安排这件事感到困惑。
这是我的 app.py
,其中概述了服务器要执行的任务:
from celery import Celery
app = Celery('tasks',
backend='redis://localhost:6379/0',
broker='redis://localhost:6379/0')
@app.task
def add(x, y):
with open('results.txt', 'a') as out:
out.write(str(x + y) + '\n')
这是安排工作的脚本 (worker.py
):
'''Worker node; executes tasks outlined in app.py'''
from app import add
# run the add function and pass in arguments
for i in range(10000):
result = add.apply_async(args=[1,i]).get()
在我的本地机器上,如果我在终端中 运行 celery worker -l info -A app
,那将启动 celery 应用程序。如果我然后 运行 worker.py
,我会看到正在搅拌的工作。
如何让多个不同的主机消费未完成的任务?每台服务器都可以访问 Redis 所在的静态 IP 运行。我是否向每个主机提交 celery worker -l info -A app
命令?如果是这样,每个主机是否会在未完成的工作上线时神奇地消费?如果其他人可以就这些高级问题提供任何帮助,我将不胜感激!
为了回答上面的问题,我创建了一个名为 app.py
的文件并将其加载到我可以通过 ssh 访问的前端节点。该文件概述了各个服务器上的各个工作人员将处理的功能:
from celery import Celery
app = Celery('tasks',
backend='redis://daccssfe.crc.nd.edu:6379/0',
broker='redis://daccssfe.crc.nd.edu:6379/0')
@app.task
def log(*args):
# have all workers write their results to a common outfile
with open('/scratch365/dduhaime/celery-test.txt', 'a') as out:
out.write('-'.join([str(i).strip() for i in args]) + '\n')
接下来我定义了一个函数 schedule_work.py
来安排要完成的工作:
'''Worker node; executes tasks outlined in app.py'''
from app import log
# run the add function and pass in arguments
for i in range(10000):
print('* processing', i)
result = log.apply_async(args=[str(i)]).get()
此文件创建 10,000 个工作单元,并将每个整数 0:10000-1 传递到工作队列。当工作人员上线时,他们将处理这个队列。
为了添加工人,我使用我大学的超级计算系统创建了 10 个工人,每个工人启动 app.py
文件,这将使工人从堆栈中消耗工作。为此,我使用 Sun Grid Engine 队列系统(我正在使用的超级计算机将其用作作业提交协议),我将以下内容保存在文件 start_workers.sh
:
中
#!/bin/bash
#$ -N celery
#$ -o logs/celery.log
#$ -t 1-10:1
#$ -pe smp 4
#$ -q long
#$ -r y
source ~/.bash_profile
source celery-env/bin/activate
# add a new worker
celery worker -l info -A app
然后我提交了这些工作 (qsub start_workers.sh
),这启动了 10 个工人,每个工人都从待办工作列表中拉取。最后,他们都将他们的主机地址和要完成的工作列表中的参数记录到请求的输出文件中,他们都可以访问这些文件。正如我们在结果文件中看到的那样,10 台工作主机中的不同主机消耗了不同的输入:
# /scratch365/dduhaime/celery-test.txt content
10.32.77.210-0
10.32.77.210-1
10.32.77.132-2
10.32.77.210-3
10.32.77.142-4
10.32.77.132-5
10.32.77.210-6
10.32.77.192-7
10.32.77.116-8
10.32.77.142-9
10.32.77.132-10
...
我正在尝试使用 Celery 处理输入列表。我只想处理每个输入一次。问题是我的服务器都是超级计算机集群的一部分。我可以向每个服务器发送一个命令来启动一个进程。一旦该服务器被安排为我的用户名执行工作(在未来的某个随机时间发生),它将启动该过程(因此服务器的数量 运行ning 在任何时候都是不确定的)。我希望所有为我的用户名执行工作的服务器共享可用工作,直到完成所有必需的工作。
不过,我对如何准确安排这件事感到困惑。
这是我的 app.py
,其中概述了服务器要执行的任务:
from celery import Celery
app = Celery('tasks',
backend='redis://localhost:6379/0',
broker='redis://localhost:6379/0')
@app.task
def add(x, y):
with open('results.txt', 'a') as out:
out.write(str(x + y) + '\n')
这是安排工作的脚本 (worker.py
):
'''Worker node; executes tasks outlined in app.py'''
from app import add
# run the add function and pass in arguments
for i in range(10000):
result = add.apply_async(args=[1,i]).get()
在我的本地机器上,如果我在终端中 运行 celery worker -l info -A app
,那将启动 celery 应用程序。如果我然后 运行 worker.py
,我会看到正在搅拌的工作。
如何让多个不同的主机消费未完成的任务?每台服务器都可以访问 Redis 所在的静态 IP 运行。我是否向每个主机提交 celery worker -l info -A app
命令?如果是这样,每个主机是否会在未完成的工作上线时神奇地消费?如果其他人可以就这些高级问题提供任何帮助,我将不胜感激!
为了回答上面的问题,我创建了一个名为 app.py
的文件并将其加载到我可以通过 ssh 访问的前端节点。该文件概述了各个服务器上的各个工作人员将处理的功能:
from celery import Celery
app = Celery('tasks',
backend='redis://daccssfe.crc.nd.edu:6379/0',
broker='redis://daccssfe.crc.nd.edu:6379/0')
@app.task
def log(*args):
# have all workers write their results to a common outfile
with open('/scratch365/dduhaime/celery-test.txt', 'a') as out:
out.write('-'.join([str(i).strip() for i in args]) + '\n')
接下来我定义了一个函数 schedule_work.py
来安排要完成的工作:
'''Worker node; executes tasks outlined in app.py'''
from app import log
# run the add function and pass in arguments
for i in range(10000):
print('* processing', i)
result = log.apply_async(args=[str(i)]).get()
此文件创建 10,000 个工作单元,并将每个整数 0:10000-1 传递到工作队列。当工作人员上线时,他们将处理这个队列。
为了添加工人,我使用我大学的超级计算系统创建了 10 个工人,每个工人启动 app.py
文件,这将使工人从堆栈中消耗工作。为此,我使用 Sun Grid Engine 队列系统(我正在使用的超级计算机将其用作作业提交协议),我将以下内容保存在文件 start_workers.sh
:
#!/bin/bash
#$ -N celery
#$ -o logs/celery.log
#$ -t 1-10:1
#$ -pe smp 4
#$ -q long
#$ -r y
source ~/.bash_profile
source celery-env/bin/activate
# add a new worker
celery worker -l info -A app
然后我提交了这些工作 (qsub start_workers.sh
),这启动了 10 个工人,每个工人都从待办工作列表中拉取。最后,他们都将他们的主机地址和要完成的工作列表中的参数记录到请求的输出文件中,他们都可以访问这些文件。正如我们在结果文件中看到的那样,10 台工作主机中的不同主机消耗了不同的输入:
# /scratch365/dduhaime/celery-test.txt content
10.32.77.210-0
10.32.77.210-1
10.32.77.132-2
10.32.77.210-3
10.32.77.142-4
10.32.77.132-5
10.32.77.210-6
10.32.77.192-7
10.32.77.116-8
10.32.77.142-9
10.32.77.132-10
...