Python Multiprocessing - TypeError: Pickling an AuthenticationString object is disallowed for security reasons

Python Multiprocessing - TypeError: Pickling an AuthenticationString object is disallowed for security reasons

我遇到了以下问题。我想实现一个网络爬虫,到目前为止它可以工作但是它太慢了,我尝试使用多处理来获取 URL。 不幸的是,我在这个领域不是很有经验。 在阅读了一些内容之后,在我看来最简单的方法是使用 multiprocessing.pool 中的 map 方法。

但我经常收到以下错误:

TypeError: Pickling an AuthenticationString object is disallowed for security reasons

我发现很少有相同错误的案例,不幸的是他们没有帮助我。

我创建了一个可以重现错误的代码的剥离版本:

import multiprocessing

class TestCrawler:
    def __init__(self):
        self.m = multiprocessing.Manager()
        self.queue = self.m.Queue()
        for i in range(50):
            self.queue.put(str(i))
        self.pool = multiprocessing.Pool(6)



    def mainloop(self):
        self.process_next_url(self.queue)

        while True:
            self.pool.map(self.process_next_url, (self.queue,))                

    def process_next_url(self, queue):
        url = queue.get()
        print(url)


c = TestCrawler()
c.mainloop()

如有任何帮助或建议,我将不胜感激!

Question: But I constantly get the following error:

您得到的错误具有误导性,原因是

self.queue = self.m.Queue()

Queue 实例化移到 class TestCrawler 之外。
这会导致另一个错误:

NotImplementedError: pool objects cannot be passed between processes or pickled

原因是:

self.pool = multiprocessing.Pool(6)

两个错误都表明 pickle 找不到 class Members

Note: Endless Loop!
Your following while Loop leads to a Endless Loop! This will overload your System!
Furthermore, your pool.map(... starts only one Process with one Task!

    while True:
        self.pool.map(self.process_next_url, (self.queue,)) 

我建议阅读 The Examples that demonstrates the use of a pool


更改为以下内容:

class TestCrawler:
    def __init__(self, tasks):
        # Assign the Global task to class member
        self.queue = tasks
        for i in range(50):
            self.queue.put(str(i))

    def mainloop(self):
        # Instantiate the pool local
        pool = mp.Pool(6)
        for n in range(50):
            # .map requires a Parameter pass None
            pool.map(self.process_next_url, (None,))

    # None is passed
    def process_next_url(self, dummy):
        url = self.queue.get()
        print(url)

if __name__ == "__main__":
  # Create the Queue as Global
  tasks = mp.Manager().Queue()
  # Pass the Queue to your class TestCrawler
  c = TestCrawler(tasks)
  c.mainloop()

此示例启动 5 个进程,每个进程处理 10 个任务(url):

class TestCrawler2:
    def __init__(self, tasks):
        self.tasks = tasks

    def start(self):
        pool = mp.Pool(5)
        pool.map(self.process_url, self.tasks)

    def process_url(self, url):
        print('self.process_url({})'.format(url))

if __name__ == "__main__":
    tasks = ['url{}'.format(n) for n in range(50)]
    TestCrawler2(tasks).start()

使用 Python 测试:3.4.2