在 pygmo 中使用队列进行函数评估

Using Queues for function evaluation in pygmo

如果有人感兴趣,我正在尝试使用 pygmo 2.5 optimization libraries installed via Anaconda3 with some existing code which wraps the asynchronous and distributed evaluation of parameter vectors via an executable which does trajectory optimization (it's POST2)。为了促进这一点,我在网络上使用 multiprocessing.SyncManager 和 multiprocessing.Queues 来传递输入并接收输出和日志消息。所以在这种情况下,pygmo 会选择要尝试的向量,支持代码会将其传递到输入队列中,一些分布式工作人员会抓取该队列,通过可执行文件进行评估,然后将结果传回,最终将返回给任何人pygmo.algorithm 正在用于评估

我的问题是,当 pygmo 初始化一个问题时,它会对所提供的 class 进行深度复制,在我的例子和下面提供的示例代码中,它包含几个队列。执行深度复制后,我收到错误

  File "pygmo_testing.py", line 121, in <module>
    main()
  File "pygmo_testing.py", line 108, in main
    prob = pg.problem(my_prob)
  File "C:\Anaconda3\lib\copy.py", line 180, in deepcopy
    y = _reconstruct(x, memo, *rv)
  File "C:\Anaconda3\lib\copy.py", line 280, in _reconstruct
    state = deepcopy(state, memo)
  File "C:\Anaconda3\lib\copy.py", line 150, in deepcopy
    y = copier(x, memo)
  File "C:\Anaconda3\lib\copy.py", line 240, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
  File "C:\Anaconda3\lib\copy.py", line 169, in deepcopy
    rv = reductor(4)
  File "C:\Anaconda3\lib\multiprocessing\queues.py", line 58, in __getstate__
    context.assert_spawning(self)
  File "C:\Anaconda3\lib\multiprocessing\context.py", line 356, in assert_spawning
    ' through inheritance' % type(obj).__name__
RuntimeError: Queue objects should only be shared between processes through inheritance

有办法解决这个问题吗?我需要为这段代码使用的其他方法保持异步和分布式的执行方式。为了完整性,我还尝试了 queue.Queue 和 multiprocessing.Manager.Queue(两者都不能与其他现有代码一起使用),但它总是归结为深度复制。

谢谢大家!


"""
****************************** Import Statements ******************************
"""
import pygmo as pg
from multiprocessing import Pool, Queue

"""
****************************** Utility Functions ******************************
"""  
def sphere_fitness(x):
    return sum(x*x)

def worker(inp_q, out_q):

    while True:

        x = inp_q.get()
        print("got {}".format(x))
        if x == False:
            break
        else:
            fit = sphere_fitness(x)
            print("x: {} f: {}".format(x, fit))
            out_q.put_nowait(fit)
            print("submitted {}".format(x))        
"""
********************************** Class(es) ************************************
"""
class distributed_submit(object):
    """ Class for pygmo Problem"""

    def __init__(self, dim, inp_q, out_q):
        self.dim = dim
        self._inp_q = inp_q
        self._out_q = out_q

    def _submit(self, inp_q, x):
        self._inp_q.put_nowait(x)
        print("x delivered")

    def _receive(self, out_q):
        return self._out_q.get()

    def fitness(self, x):
        self._submit(x)
        print("put in {}".format(x))
        fit = self._receive()
        print("got {}".format(fit))
        return [fit]

    def get_bounds(self):
        return ([-1]*self.dim, [1]*self.dim)

    def get_name(self):
        return "Sphere Function"

    def get_extra_info(self):
        return "\tDimensions: {}".format(self.dim)


"""
******************************* Main Function ********************************
"""
def main():

    # Queues from multiprocessing
    _inp_q   = Queue()
    _out_q   = Queue()

    _workers = Pool(initializer=worker,
                    initargs=(_inp_q, _out_q))
    _workers.close()

    my_prob = distributed_submit(3, _inp_q, _out_q)

    prob = pg.problem(my_prob)

    algo = pg.algorithm(pg.bee_colony(gen=20, limit=20))

    pop = pg.population(prob, 10)
    print(pop)

    pop = algo.evolve(pop)

    print(pop.champion_f)


if __name__ == "__main__":
    main()

对于发现此问题并需要答案的任何人 - collections.deque 支持 deepcopy

https://docs.python.org/3/library/collections.html#collections.deque

In addition to the above, deques support ... copy.copy(d), copy.deepcopy(d),

但是 collections.deque 不是进程感知的,因此不能用于分发