Python 多处理:如何 运行 从具有列表的下一个元素的一组进程中再次处理一个进程?
Python Multiprocessing : How to run a process again from a set of processes with next element of list?
我有一个包含 table 个名字的列表,假设列表的大小为 n。现在我有 m 个服务器,所以我打开了 m 个游标,每个游标都在另一个列表中。现在对于每个 table 我想调用一个将参数作为这两个列表的函数。
templst = [T1,T2,T3,T4,T5,T6, T7,T8,T9,T10,T11]
curlst = [cur1,cur2,cur3,cur4,cur5]
这些游标作为 cur = conn.cursor() 打开,所以这些是对象
def extract_single(tableName, cursorconn):
qry2 = "Select * FROM %s"% (tableName)
cursorconn.execute(qry2).fetchall()
print " extraction done"
return
现在我已经打开了 5 个进程(因为我有 5 个游标)以便运行它们并行。
processes = []
x = 0
for x in range(5):
new_p = 'p%x'%x
print "process :", new_p
new_p = multiprocessing.Process(target=extract_single, args=(templst[x],cur[x]))
new_p.start()
processes.append(new_p)
for process in processes:
process.join()
所以这确保我为每个游标打开了 5 个进程,并且它使用了前 5 个 table 名称。
现在我希望,一旦 5 个进程中的任何进程完成,它应该立即从我的 templst 中取出第 6 个 table,同样的事情继续直到所有的 templst 完成。
如何针对此行为修改此代码?
例如
举个简单的例子,我想做什么。让我们将 templst 视为一个 int,我想为其调用睡眠函数
templst = [1,2,5,7,4,3,6,8,9,10,11]
curlst = [cur1,cur2,cur3,cur4,cur5]
def extract_single(sec, cursorconn):
print "Sleeping for second=%s done by cursor=%s"% (sec,cursorconn)
time.sleep(sec)
print " sleeping done"
return
所以当我启动 5 个游标时,睡眠 (1) 或睡眠 (2) 可能先完成
所以它一完成我就想运行用那个光标睡觉(3)。
我真正的查询将取决于游标,因为它将是 SQL 查询
修改方法
考虑之前的睡眠示例。我现在想实现我假设有 10 个游标并且我的睡眠队列按升序或降序排序。
考虑升序排列的列表
现在,在 10 个游标中,前 5 个游标将从队列中获取前 5 个元素,而我的另一组 5 个游标将获取最后 5 个元素。
所以基本上我的游标队列分为两半,其中取最低值,另一半取最高值。
现在,如果前半部分的游标完成,它应该取下一个可用的最低值,如果下半部分的游标完成,那么它应该取第 (n-6) 个值,即从结束开始的第 6 个值。
我需要从两边遍历队列并且有两组游标,每组 5
example: curlst1 = [cur1,cur2,cur3,cur4,cur5]
curlst2 = [cur6,cur7,cur8,cur9,cur10 ]
templst = [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16]
so cur1 -> 1
cur2 ->2
... cur5 -> 5
cur6 -> 16
cur7 ->15
.... cur10->12
现在 cur1 首先完成,因此需要 6 个(前面的第一个可用元素)
cur2 finsihes 它需要 7 等等
如果 cur 10 finsihes 它将需要 11(后面的下一个可用元素)
依此类推,直到 templst 的所有元素。
将您的 templst
参数(无论是真实示例中的 table 名称还是下例中的休眠秒数)放在多处理队列中。然后每个进程循环读取队列中的下一个项目。当队列为空时,没有更多的工作要执行,您的进程可以 return。您实际上已经实现了自己的进程池,其中每个进程都有自己专用的游标连接。现在你的函数 extract_single
将队列作为第一个参数,从中检索 table 名称或秒参数。
import multiprocessing
import Queue
import time
def extract_single(q, cursorconn):
while True:
try:
sec = q.get_nowait()
print "Sleeping for second=%s done by cursor=%s" % (sec,cursorconn)
time.sleep(sec)
print " sleeping done"
except Queue.Empty:
return
def main():
q = multiprocessing.Queue()
templst = [1,2,5,7,4,3,6,8,9,10,11]
for item in templst:
q.put(item) # add items to queue
curlst = [cur1,cur2,cur3,cur4,cur5]
process = []
for i in xrange(5):
p = multiprocessing.Process(target=extract_single, args=(q, curlst[i]))
process.append(p)
p.start()
for p in process:
p.join()
if __name__ == '__main__':
main()
备注
如果您的处理器少于 5 个,您可以尝试 运行 使用 5 个(或更多)线程,在这种情况下,应使用常规 Queue
对象。
更新问题的更新答案
允许您从队列前端和末尾删除项目的数据结构称为 deque(双端队列)。不幸的是,没有支持多处理的双端队列版本。但我认为您的 table 处理可能与线程一样工作,并且您的计算机不太可能有 10 个处理器来支持 10 个并发进程 运行 无论如何。
import threading
from collections import deque
import time
import sys
templst = [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16]
q = deque(templst)
curlst1 = [cur1,cur2,cur3,cur4,cur5]
curlst2 = [cur6,cur7,cur8,cur9,cur10]
def extract_single(cursorconn, from_front):
while True:
try:
sec = q.popleft() if from_front else q.pop()
#print "Sleeping for second=%s done by cursor=%s" % (sec,cursorconn)
sys.stdout.write("Sleeping for second=%s done by cursor=%s\n" % (sec,cursorconn))
sys.stdout.flush() # flush output
time.sleep(sec)
#print " sleeping done"
sys.stdout.write("sleeping done by %s\n" % cursorconn)
sys.stdout.flush() # flush output
except IndexError:
return
def main():
threads = []
for cur in curlst1:
t = threading.Thread(target=extract_single, args=(cur, True))
threads.append(t)
t.start()
for cur in curlst2:
t = threading.Thread(target=extract_single, args=(cur, False))
threads.append(t)
t.start()
for t in threads:
t.join()
if __name__ == '__main__':
main()
我有一个包含 table 个名字的列表,假设列表的大小为 n。现在我有 m 个服务器,所以我打开了 m 个游标,每个游标都在另一个列表中。现在对于每个 table 我想调用一个将参数作为这两个列表的函数。
templst = [T1,T2,T3,T4,T5,T6, T7,T8,T9,T10,T11]
curlst = [cur1,cur2,cur3,cur4,cur5]
这些游标作为 cur = conn.cursor() 打开,所以这些是对象
def extract_single(tableName, cursorconn):
qry2 = "Select * FROM %s"% (tableName)
cursorconn.execute(qry2).fetchall()
print " extraction done"
return
现在我已经打开了 5 个进程(因为我有 5 个游标)以便运行它们并行。
processes = []
x = 0
for x in range(5):
new_p = 'p%x'%x
print "process :", new_p
new_p = multiprocessing.Process(target=extract_single, args=(templst[x],cur[x]))
new_p.start()
processes.append(new_p)
for process in processes:
process.join()
所以这确保我为每个游标打开了 5 个进程,并且它使用了前 5 个 table 名称。 现在我希望,一旦 5 个进程中的任何进程完成,它应该立即从我的 templst 中取出第 6 个 table,同样的事情继续直到所有的 templst 完成。
如何针对此行为修改此代码? 例如 举个简单的例子,我想做什么。让我们将 templst 视为一个 int,我想为其调用睡眠函数
templst = [1,2,5,7,4,3,6,8,9,10,11]
curlst = [cur1,cur2,cur3,cur4,cur5]
def extract_single(sec, cursorconn):
print "Sleeping for second=%s done by cursor=%s"% (sec,cursorconn)
time.sleep(sec)
print " sleeping done"
return
所以当我启动 5 个游标时,睡眠 (1) 或睡眠 (2) 可能先完成 所以它一完成我就想运行用那个光标睡觉(3)。
我真正的查询将取决于游标,因为它将是 SQL 查询
修改方法 考虑之前的睡眠示例。我现在想实现我假设有 10 个游标并且我的睡眠队列按升序或降序排序。 考虑升序排列的列表 现在,在 10 个游标中,前 5 个游标将从队列中获取前 5 个元素,而我的另一组 5 个游标将获取最后 5 个元素。 所以基本上我的游标队列分为两半,其中取最低值,另一半取最高值。 现在,如果前半部分的游标完成,它应该取下一个可用的最低值,如果下半部分的游标完成,那么它应该取第 (n-6) 个值,即从结束开始的第 6 个值。
我需要从两边遍历队列并且有两组游标,每组 5
example: curlst1 = [cur1,cur2,cur3,cur4,cur5]
curlst2 = [cur6,cur7,cur8,cur9,cur10 ]
templst = [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16]
so cur1 -> 1
cur2 ->2
... cur5 -> 5
cur6 -> 16
cur7 ->15
.... cur10->12
现在 cur1 首先完成,因此需要 6 个(前面的第一个可用元素) cur2 finsihes 它需要 7 等等 如果 cur 10 finsihes 它将需要 11(后面的下一个可用元素)
依此类推,直到 templst 的所有元素。
将您的 templst
参数(无论是真实示例中的 table 名称还是下例中的休眠秒数)放在多处理队列中。然后每个进程循环读取队列中的下一个项目。当队列为空时,没有更多的工作要执行,您的进程可以 return。您实际上已经实现了自己的进程池,其中每个进程都有自己专用的游标连接。现在你的函数 extract_single
将队列作为第一个参数,从中检索 table 名称或秒参数。
import multiprocessing
import Queue
import time
def extract_single(q, cursorconn):
while True:
try:
sec = q.get_nowait()
print "Sleeping for second=%s done by cursor=%s" % (sec,cursorconn)
time.sleep(sec)
print " sleeping done"
except Queue.Empty:
return
def main():
q = multiprocessing.Queue()
templst = [1,2,5,7,4,3,6,8,9,10,11]
for item in templst:
q.put(item) # add items to queue
curlst = [cur1,cur2,cur3,cur4,cur5]
process = []
for i in xrange(5):
p = multiprocessing.Process(target=extract_single, args=(q, curlst[i]))
process.append(p)
p.start()
for p in process:
p.join()
if __name__ == '__main__':
main()
备注
如果您的处理器少于 5 个,您可以尝试 运行 使用 5 个(或更多)线程,在这种情况下,应使用常规 Queue
对象。
更新问题的更新答案
允许您从队列前端和末尾删除项目的数据结构称为 deque(双端队列)。不幸的是,没有支持多处理的双端队列版本。但我认为您的 table 处理可能与线程一样工作,并且您的计算机不太可能有 10 个处理器来支持 10 个并发进程 运行 无论如何。
import threading
from collections import deque
import time
import sys
templst = [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16]
q = deque(templst)
curlst1 = [cur1,cur2,cur3,cur4,cur5]
curlst2 = [cur6,cur7,cur8,cur9,cur10]
def extract_single(cursorconn, from_front):
while True:
try:
sec = q.popleft() if from_front else q.pop()
#print "Sleeping for second=%s done by cursor=%s" % (sec,cursorconn)
sys.stdout.write("Sleeping for second=%s done by cursor=%s\n" % (sec,cursorconn))
sys.stdout.flush() # flush output
time.sleep(sec)
#print " sleeping done"
sys.stdout.write("sleeping done by %s\n" % cursorconn)
sys.stdout.flush() # flush output
except IndexError:
return
def main():
threads = []
for cur in curlst1:
t = threading.Thread(target=extract_single, args=(cur, True))
threads.append(t)
t.start()
for cur in curlst2:
t = threading.Thread(target=extract_single, args=(cur, False))
threads.append(t)
t.start()
for t in threads:
t.join()
if __name__ == '__main__':
main()