带锁的同步 numpy 二维数组计数器
Synchronised numpy 2D array counter with locks
我想并行化一个在共享 numpy 二维数组上运行的方法。
我的原始应用程序是研究的一部分并且非常复杂,但是,我创建了一个基本上复制了问题的玩具示例。
有一家服装店,出售不同尺码和颜色的衣服。我将这家商店的库存表示为一个二维矩阵,其中 self.supply_arr[i][j]
表示 size i
和 color j
的衣服总可用性。我有多个客户试图从商店购买。商店销售的衣服不应超过库存。下面,我展示了一个非平行的例子。
import numpy as np
class ClothStore(object):
def __init__(self, num_customers):
self.supply_arr = np.random.randint(5, size=(2,2))
self.sold_arr = np.zeros((2,2), dtype=int)
self.num_customers = num_customers
def make_purchase(self, size, color):
left = self.supply_arr[size][color] - self.sold_arr[size][color]
if left > 0:
self.sold_arr[size][color] += 1
return True
else:
return False
def run(self):
for customer in xrange(self.num_customers):
size = np.random.randint(2)
color = np.random.randint(2)
purchase = self.make_purchase(size, color)
if purchase:
print "Customer: {} made successful purchase".format(customer)
if __name__ == "__main__":
store = ClothStore(100)
store.run()
print "Supply Arr: {}".format(store.supply_arr)
print "Sold Arr: {}".format(store.sold_arr)
我尝试使用 pathos
并行化 run(self)
方法,并将 self.supply_arr
表示为 np.empty((2,2), dtype=object)
,其中我将每个元素初始化为 multiprocessing.Value
。但是,我无法让它工作。任何帮助,将不胜感激。谢谢。
我用迂回的方式设法解决了我自己的问题。这不是最优雅的方式,但它确实有效。我真的很感激帮助使它更优雅。
import numpy as np
from pathos.multiprocessing import ProcessingPool as Pool
from multiprocess import Manager
class ClothStoreNew(object):
def __init__(self, num_customers):
self.supply_arr = np.random.randint(5, size=(2, 2))
self.num_customers = num_customers
def make_purchase(self, arg):
sold_dict = arg[0]
i = arg[1]
size = self.demand[i][1]
color = self.demand[i][2]
sold = sold_dict.get((size, color), 0)
if self.supply_arr[size][color] > sold:
sold_dict[(size, color)] = sold + 1
def run(self):
m = Manager()
sold_dict = m.dict()
pool = Pool(processes=100)
self.demand = []
for customer in xrange(self.num_customers):
size = np.random.randint(1)
color = np.random.randint(1)
self.demand.append([customer, size, color])
pool.map(self.make_purchase, ([sold_dict, i] for i in xrange(self.num_customers)))
pool.close()
pool.join()
return dict(sold_dict)
if __name__ == "__main__":
store = ClothStoreNew(20)
sold_dict = store.run()
print "Supply Arr: {}".format(store.supply_arr)
print "Sold Dict: {}".format(sold_dict)
如您所见,我正在使用 manager.dict()
进行同步。我想使用 manager.list()
但它似乎不起作用。此外,使用 Manager
为每次更新锁定整个字典,一个理想的解决方案是一次锁定字典的每个单独键(或二维矩阵的每个单独单元格),以便进程在其他细胞不必等待。
我想并行化一个在共享 numpy 二维数组上运行的方法。
我的原始应用程序是研究的一部分并且非常复杂,但是,我创建了一个基本上复制了问题的玩具示例。
有一家服装店,出售不同尺码和颜色的衣服。我将这家商店的库存表示为一个二维矩阵,其中 self.supply_arr[i][j]
表示 size i
和 color j
的衣服总可用性。我有多个客户试图从商店购买。商店销售的衣服不应超过库存。下面,我展示了一个非平行的例子。
import numpy as np
class ClothStore(object):
def __init__(self, num_customers):
self.supply_arr = np.random.randint(5, size=(2,2))
self.sold_arr = np.zeros((2,2), dtype=int)
self.num_customers = num_customers
def make_purchase(self, size, color):
left = self.supply_arr[size][color] - self.sold_arr[size][color]
if left > 0:
self.sold_arr[size][color] += 1
return True
else:
return False
def run(self):
for customer in xrange(self.num_customers):
size = np.random.randint(2)
color = np.random.randint(2)
purchase = self.make_purchase(size, color)
if purchase:
print "Customer: {} made successful purchase".format(customer)
if __name__ == "__main__":
store = ClothStore(100)
store.run()
print "Supply Arr: {}".format(store.supply_arr)
print "Sold Arr: {}".format(store.sold_arr)
我尝试使用 pathos
并行化 run(self)
方法,并将 self.supply_arr
表示为 np.empty((2,2), dtype=object)
,其中我将每个元素初始化为 multiprocessing.Value
。但是,我无法让它工作。任何帮助,将不胜感激。谢谢。
我用迂回的方式设法解决了我自己的问题。这不是最优雅的方式,但它确实有效。我真的很感激帮助使它更优雅。
import numpy as np
from pathos.multiprocessing import ProcessingPool as Pool
from multiprocess import Manager
class ClothStoreNew(object):
def __init__(self, num_customers):
self.supply_arr = np.random.randint(5, size=(2, 2))
self.num_customers = num_customers
def make_purchase(self, arg):
sold_dict = arg[0]
i = arg[1]
size = self.demand[i][1]
color = self.demand[i][2]
sold = sold_dict.get((size, color), 0)
if self.supply_arr[size][color] > sold:
sold_dict[(size, color)] = sold + 1
def run(self):
m = Manager()
sold_dict = m.dict()
pool = Pool(processes=100)
self.demand = []
for customer in xrange(self.num_customers):
size = np.random.randint(1)
color = np.random.randint(1)
self.demand.append([customer, size, color])
pool.map(self.make_purchase, ([sold_dict, i] for i in xrange(self.num_customers)))
pool.close()
pool.join()
return dict(sold_dict)
if __name__ == "__main__":
store = ClothStoreNew(20)
sold_dict = store.run()
print "Supply Arr: {}".format(store.supply_arr)
print "Sold Dict: {}".format(sold_dict)
如您所见,我正在使用 manager.dict()
进行同步。我想使用 manager.list()
但它似乎不起作用。此外,使用 Manager
为每次更新锁定整个字典,一个理想的解决方案是一次锁定字典的每个单独键(或二维矩阵的每个单独单元格),以便进程在其他细胞不必等待。