为什么我收到多处理的递归错误?

Why am I receiving a Recursion error with multiprocessing?

我希望使用多处理对大量地址进行地理编码。我有以下代码:

import multiprocessing
import geocoder

addresses = ['New York City, NY','Austin, TX', 'Los Angeles, CA', 'Boston, MA'] # and on and on

def geocode_worker(address):
    return geocoder.arcgis(address)

def main_process():
    pool = multiprocessing.Pool(processes=multiprocessing.cpu_count())
    return pool.map(geocode_worker, addresses)

if __name__ == '__main__':
    main_process()

但是它给我这个错误:

Traceback (most recent call last):
  File "/opt/anaconda3/lib/python3.7/threading.py", line 926, in _bootstrap_inner
    self.run()
  File "/opt/anaconda3/lib/python3.7/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/anaconda3/lib/python3.7/multiprocessing/pool.py", line 470, in _handle_results
    task = get()
  File "/opt/anaconda3/lib/python3.7/multiprocessing/connection.py", line 251, in recv
    return _ForkingPickler.loads(buf.getbuffer())
  File "/opt/anaconda3/lib/python3.7/site-packages/geocoder/base.py", line 599, in __getattr__
    if not self.ok:
  File "/opt/anaconda3/lib/python3.7/site-packages/geocoder/base.py", line 536, in ok
    return len(self) > 0
  File "/opt/anaconda3/lib/python3.7/site-packages/geocoder/base.py", line 422, in __len__
    return len(self._list)

错误的最后3行一遍遍重复,然后traceback的最后一行是:

RecursionError: maximum recursion depth exceeded while calling a Python object

谁能帮我弄清楚为什么?

问题是 geocoder 编辑的 ArcgisQuery 对象 return 不可腌制 - 或者更确切地说,它不可不可腌制。 unpickle 过程由于使用 __getattr__ 而进入无限循环,它在内部尝试访问 self.ok,最终依赖于要定义的 self._list,而它在 unpickling 时未定义,因为它仅在 __init____init__ is not called while unpickling 中定义。由于未定义,它会尝试使用 __getattr__ 来找到它,这会再次尝试访问 self.ok,并创建无限循环。

您可以通过不在工作进程和主进程之间传递 ArcgisQuery 对象本身来解决这个问题,而是只传递其底层 __dict__。然后,重建主进程中的 ArcgisQuery 个对象:

import multiprocessing
import geocoder
from geocoder.arcgis import ArcgisQuery

addresses = ['New York City, NY','Austin, TX', 'Los Angeles, CA', 'Boston, MA'] # and on and on

def geocode_worker(address):
    out = geocoder.arcgis(address)
    return out.__dict__ # Only return the object's __dict__

def main_process():
    pool = multiprocessing.Pool(processes=multiprocessing.cpu_count())
    l = pool.map(geocode_worker, addresses)
    out = []
    for d in l:
        q = ArcgisQuery(d['location'])  # location is a required constructor arg
        q.__dict__.update(d)  # Load the rest of our state into the new object
        out.append(q)
    return out

if __name__ == '__main__':
    print(main_process())

如果您实际上不需要整个 ArcgisQuery 对象,而只需要它的某些部分,您也可以只 return 那些来自工作进程的对象,以避免需要这个破解。

就其价值而言,看起来 geocoder 可以通过在 ArcgisQuery 或其基础 class 上实施 __getstate____setstate__ 来解决其酸洗问题,如下所示:

    def __getstate__(self):
        return self.__dict__

    def __setstate__(self, state):
        self.__dict__.update(state)