在 pygmo 中使用队列进行函数评估
Using Queues for function evaluation in pygmo
如果有人感兴趣,我正在尝试使用 pygmo 2.5 optimization libraries installed via Anaconda3 with some existing code which wraps the asynchronous and distributed evaluation of parameter vectors via an executable which does trajectory optimization (it's POST2)。为了促进这一点,我在网络上使用 multiprocessing.SyncManager 和 multiprocessing.Queues 来传递输入并接收输出和日志消息。所以在这种情况下,pygmo 会选择要尝试的向量,支持代码会将其传递到输入队列中,一些分布式工作人员会抓取该队列,通过可执行文件进行评估,然后将结果传回,最终将返回给任何人pygmo.algorithm 正在用于评估
我的问题是,当 pygmo 初始化一个问题时,它会对所提供的 class 进行深度复制,在我的例子和下面提供的示例代码中,它包含几个队列。执行深度复制后,我收到错误
File "pygmo_testing.py", line 121, in <module>
main()
File "pygmo_testing.py", line 108, in main
prob = pg.problem(my_prob)
File "C:\Anaconda3\lib\copy.py", line 180, in deepcopy
y = _reconstruct(x, memo, *rv)
File "C:\Anaconda3\lib\copy.py", line 280, in _reconstruct
state = deepcopy(state, memo)
File "C:\Anaconda3\lib\copy.py", line 150, in deepcopy
y = copier(x, memo)
File "C:\Anaconda3\lib\copy.py", line 240, in _deepcopy_dict
y[deepcopy(key, memo)] = deepcopy(value, memo)
File "C:\Anaconda3\lib\copy.py", line 169, in deepcopy
rv = reductor(4)
File "C:\Anaconda3\lib\multiprocessing\queues.py", line 58, in __getstate__
context.assert_spawning(self)
File "C:\Anaconda3\lib\multiprocessing\context.py", line 356, in assert_spawning
' through inheritance' % type(obj).__name__
RuntimeError: Queue objects should only be shared between processes through inheritance
有办法解决这个问题吗?我需要为这段代码使用的其他方法保持异步和分布式的执行方式。为了完整性,我还尝试了 queue.Queue 和 multiprocessing.Manager.Queue(两者都不能与其他现有代码一起使用),但它总是归结为深度复制。
谢谢大家!
"""
****************************** Import Statements ******************************
"""
import pygmo as pg
from multiprocessing import Pool, Queue
"""
****************************** Utility Functions ******************************
"""
def sphere_fitness(x):
return sum(x*x)
def worker(inp_q, out_q):
while True:
x = inp_q.get()
print("got {}".format(x))
if x == False:
break
else:
fit = sphere_fitness(x)
print("x: {} f: {}".format(x, fit))
out_q.put_nowait(fit)
print("submitted {}".format(x))
"""
********************************** Class(es) ************************************
"""
class distributed_submit(object):
""" Class for pygmo Problem"""
def __init__(self, dim, inp_q, out_q):
self.dim = dim
self._inp_q = inp_q
self._out_q = out_q
def _submit(self, inp_q, x):
self._inp_q.put_nowait(x)
print("x delivered")
def _receive(self, out_q):
return self._out_q.get()
def fitness(self, x):
self._submit(x)
print("put in {}".format(x))
fit = self._receive()
print("got {}".format(fit))
return [fit]
def get_bounds(self):
return ([-1]*self.dim, [1]*self.dim)
def get_name(self):
return "Sphere Function"
def get_extra_info(self):
return "\tDimensions: {}".format(self.dim)
"""
******************************* Main Function ********************************
"""
def main():
# Queues from multiprocessing
_inp_q = Queue()
_out_q = Queue()
_workers = Pool(initializer=worker,
initargs=(_inp_q, _out_q))
_workers.close()
my_prob = distributed_submit(3, _inp_q, _out_q)
prob = pg.problem(my_prob)
algo = pg.algorithm(pg.bee_colony(gen=20, limit=20))
pop = pg.population(prob, 10)
print(pop)
pop = algo.evolve(pop)
print(pop.champion_f)
if __name__ == "__main__":
main()
对于发现此问题并需要答案的任何人 - collections.deque 支持 deepcopy
每https://docs.python.org/3/library/collections.html#collections.deque
In addition to the above, deques support ... copy.copy(d), copy.deepcopy(d),
但是 collections.deque 不是进程感知的,因此不能用于分发
如果有人感兴趣,我正在尝试使用 pygmo 2.5 optimization libraries installed via Anaconda3 with some existing code which wraps the asynchronous and distributed evaluation of parameter vectors via an executable which does trajectory optimization (it's POST2)。为了促进这一点,我在网络上使用 multiprocessing.SyncManager 和 multiprocessing.Queues 来传递输入并接收输出和日志消息。所以在这种情况下,pygmo 会选择要尝试的向量,支持代码会将其传递到输入队列中,一些分布式工作人员会抓取该队列,通过可执行文件进行评估,然后将结果传回,最终将返回给任何人pygmo.algorithm 正在用于评估
我的问题是,当 pygmo 初始化一个问题时,它会对所提供的 class 进行深度复制,在我的例子和下面提供的示例代码中,它包含几个队列。执行深度复制后,我收到错误
File "pygmo_testing.py", line 121, in <module>
main()
File "pygmo_testing.py", line 108, in main
prob = pg.problem(my_prob)
File "C:\Anaconda3\lib\copy.py", line 180, in deepcopy
y = _reconstruct(x, memo, *rv)
File "C:\Anaconda3\lib\copy.py", line 280, in _reconstruct
state = deepcopy(state, memo)
File "C:\Anaconda3\lib\copy.py", line 150, in deepcopy
y = copier(x, memo)
File "C:\Anaconda3\lib\copy.py", line 240, in _deepcopy_dict
y[deepcopy(key, memo)] = deepcopy(value, memo)
File "C:\Anaconda3\lib\copy.py", line 169, in deepcopy
rv = reductor(4)
File "C:\Anaconda3\lib\multiprocessing\queues.py", line 58, in __getstate__
context.assert_spawning(self)
File "C:\Anaconda3\lib\multiprocessing\context.py", line 356, in assert_spawning
' through inheritance' % type(obj).__name__
RuntimeError: Queue objects should only be shared between processes through inheritance
有办法解决这个问题吗?我需要为这段代码使用的其他方法保持异步和分布式的执行方式。为了完整性,我还尝试了 queue.Queue 和 multiprocessing.Manager.Queue(两者都不能与其他现有代码一起使用),但它总是归结为深度复制。
谢谢大家!
"""
****************************** Import Statements ******************************
"""
import pygmo as pg
from multiprocessing import Pool, Queue
"""
****************************** Utility Functions ******************************
"""
def sphere_fitness(x):
return sum(x*x)
def worker(inp_q, out_q):
while True:
x = inp_q.get()
print("got {}".format(x))
if x == False:
break
else:
fit = sphere_fitness(x)
print("x: {} f: {}".format(x, fit))
out_q.put_nowait(fit)
print("submitted {}".format(x))
"""
********************************** Class(es) ************************************
"""
class distributed_submit(object):
""" Class for pygmo Problem"""
def __init__(self, dim, inp_q, out_q):
self.dim = dim
self._inp_q = inp_q
self._out_q = out_q
def _submit(self, inp_q, x):
self._inp_q.put_nowait(x)
print("x delivered")
def _receive(self, out_q):
return self._out_q.get()
def fitness(self, x):
self._submit(x)
print("put in {}".format(x))
fit = self._receive()
print("got {}".format(fit))
return [fit]
def get_bounds(self):
return ([-1]*self.dim, [1]*self.dim)
def get_name(self):
return "Sphere Function"
def get_extra_info(self):
return "\tDimensions: {}".format(self.dim)
"""
******************************* Main Function ********************************
"""
def main():
# Queues from multiprocessing
_inp_q = Queue()
_out_q = Queue()
_workers = Pool(initializer=worker,
initargs=(_inp_q, _out_q))
_workers.close()
my_prob = distributed_submit(3, _inp_q, _out_q)
prob = pg.problem(my_prob)
algo = pg.algorithm(pg.bee_colony(gen=20, limit=20))
pop = pg.population(prob, 10)
print(pop)
pop = algo.evolve(pop)
print(pop.champion_f)
if __name__ == "__main__":
main()
对于发现此问题并需要答案的任何人 - collections.deque 支持 deepcopy
每https://docs.python.org/3/library/collections.html#collections.deque
In addition to the above, deques support ... copy.copy(d), copy.deepcopy(d),
但是 collections.deque 不是进程感知的,因此不能用于分发