如何将队列与 "with" 上下文管理器一起使用

How to use queues with "with" context manager

我目前正在尝试创建一个带队列的上下文管理器。

import contextlib
import queue
import random
import time
from threading import Thread

import requests
from loguru import logger

list_op_proxy = [
    "http://test.io:12345",
    "http://test.io:123456",
]

proxy_q = queue.Queue()

for i in list_op_proxy:
    proxy_q.put(i)

session = requests.Session()


class QueuePut(contextlib.AbstractContextManager):

    def __enter__(self):
        self.proxy = proxy_q.get(block=True)
        return self.proxy

    def __exit__(self, excType, excValue, traceback):
        proxy_q.put(self.proxy)


def handler(name):
    while True:
        with QueuePut() as proxy:
            try:
                session.proxies = {
                    'https': f'http://{proxy}'
                }
                logger.info(f"{name} | Proxy in use: {proxy}")

                with session.get("https://whosebug.com"):
                    logger.info(f"{name} - Yay request made!")

                time.sleep(random.randint(5, 10))

            except requests.RequestException as err:
                logger.debug(f"Error! Lets try again! {err}")
                continue

            except Exception as err:
                logger.exception(f"Error! Lets debug! {err}")
                raise Exception from err

for i in range(5):
    Thread(target=handler, args=(f'Thread {i}',)).start()

我相信通过为 QueuePut 和在队列中共享的线程设置 class,我相信每当我们使用 __enter__ 时,它都会将 self.proxy 替换为最新的代理一直在访问 __enter__ 方法,我认为这可能是一个问题,我们可能会将代理丢失回队列。

我想要实现的是,当我们调用 QueuePut 时,它应该从队列中获取代理,在我们的 handler() 中使用它,并且在我们收到异常或成功请求时使用它。它应该“自动”将代理添加回队列。

我如何使用带队列的上下文管理器来做到这一点?

评论后编辑:

2021-07-07 12:34:00.952 | INFO     | __main__:handler:41 - Thread 0 | Proxy in use: http://test.io:12345
2021-07-07 12:34:00.952 | INFO     | __main__:handler:41 - Thread 1 | Proxy in use: http://test.io:123456
2021-07-07 12:34:03.211 | INFO     | __main__:handler:49 - Error! Lets try again! HTTPSConnectionPool(host='whosebug.com', port=443): Max retries exceeded with url: / (Caused by ProxyError('Cannot connect to proxy.', NewConnectionError('<urllib3.connection.HTTPSConnection object at 0x000002BD1CB593A0>: Failed to establish a new connection: [Errno 11001] getaddrinfo failed')))
2021-07-07 12:34:03.211 | INFO     | __main__:handler:41 - Thread 2 | Proxy in use: http://test.io:12345
2021-07-07 12:34:05.465 | INFO     | __main__:handler:49 - Error! Lets try again! HTTPSConnectionPool(host='whosebug.com', port=443): Max retries exceeded with url: / (Caused by ProxyError('Cannot connect to proxy.', NewConnectionError('<urllib3.connection.HTTPSConnection object at 0x000002BD1CB59580>: Failed to establish a new connection: [Errno 11001] getaddrinfo failed')))
2021-07-07 12:34:05.465 | INFO     | __main__:handler:41 - Thread 1 | Proxy in use: http://test.io:123456
2021-07-07 12:34:07.718 | INFO     | __main__:handler:49 - Error! Lets try again! HTTPSConnectionPool(host='whosebug.com', port=443): Max retries exceeded with url: / (Caused by ProxyError('Cannot connect to proxy.', NewConnectionError('<urllib3.connection.HTTPSConnection object at 0x000002BD1CB59940>: Failed to establish a new connection: [Errno 11001] getaddrinfo failed')))
2021-07-07 12:34:07.719 | INFO     | __main__:handler:41 - Thread 4 | Proxy in use: http://test.io:12345
2021-07-07 12:34:09.973 | INFO     | __main__:handler:49 - Error! Lets try again! HTTPSConnectionPool(host='whosebug.com', port=443): Max retries exceeded with url: / (Caused by ProxyError('Cannot connect to proxy.', NewConnectionError('<urllib3.connection.HTTPSConnection object at 0x000002BD1CB59520>: Failed to establish a new connection: [Errno 11001] getaddrinfo failed')))
2021-07-07 12:34:09.973 | INFO     | __main__:handler:41 - Thread 1 | Proxy in use: http://test.io:123456
2021-07-07 12:34:12.227 | INFO     | __main__:handler:49 - Error! Lets try again! HTTPSConnectionPool(host='whosebug.com', port=443): Max retries exceeded with url: / (Caused by ProxyError('Cannot connect to proxy.', NewConnectionError('<urllib3.connection.HTTPSConnection object at 0x000002BD1CB59A90>: Failed to establish a new connection: [Errno 11001] getaddrinfo failed')))
2021-07-07 12:34:12.227 | INFO     | __main__:handler:41 - Thread 4 | Proxy in use: http://test.io:12345
2021-07-07 12:34:14.479 | INFO     | __main__:handler:49 - Error! Lets try again! HTTPSConnectionPool(host='whosebug.com', port=443): Max retries exceeded with url: / (Caused by ProxyError('Cannot connect to proxy.', NewConnectionError('<urllib3.connection.HTTPSConnection object at 0x000002BD1CB595B0>: Failed to establish a new connection: [Errno 11001] getaddrinfo failed')))
2021-07-07 12:34:14.480 | INFO     | __main__:handler:41 - Thread 1 | Proxy in use: http://test.io:123456
2021-07-07 12:34:16.733 | INFO     | __main__:handler:49 - Error! Lets try again! HTTPSConnectionPool(host='whosebug.com', port=443): Max retries exceeded with url: / (Caused by ProxyError('Cannot connect to proxy.', NewConnectionError('<urllib3.connection.HTTPSConnection object at 0x000002BD1CB598E0>: Failed to establish a new connection: [Errno 11001] getaddrinfo failed')))
2021-07-07 12:34:16.733 | INFO     | __main__:handler:41 - Thread 4 | Proxy in use: http://test.io:12345
2021-07-07 12:34:18.986 | INFO     | __main__:handler:49 - Error! Lets try again! HTTPSConnectionPool(host='whosebug.com', port=443): Max retries exceeded with url: / (Caused by ProxyError('Cannot connect to proxy.', NewConnectionError('<urllib3.connection.HTTPSConnection object at 0x000002BD1CB599A0>: Failed to establish a new connection: [Errno 11001] getaddrinfo failed')))
2021-07-07 12:34:18.987 | INFO     | __main__:handler:41 - Thread 1 | Proxy in use: http://test.io:123456
2021-07-07 12:34:21.241 | INFO     | __main__:handler:49 - Error! Lets try again! HTTPSConnectionPool(host='whosebug.com', port=443): Max retries exceeded with url: / (Caused by ProxyError('Cannot connect to proxy.', NewConnectionError('<urllib3.connection.HTTPSConnection object at 0x000002BD1CB598B0>: Failed to establish a new connection: [Errno 11001] getaddrinfo failed')))
2021-07-07 12:34:21.241 | INFO     | __main__:handler:41 - Thread 4 | Proxy in use: http://test.io:12345
2021-07-07 12:34:23.493 | INFO     | __main__:handler:49 - Error! Lets try again! HTTPSConnectionPool(host='whosebug.com', port=443): Max retries exceeded with url: / (Caused by ProxyError('Cannot connect to proxy.', NewConnectionError('<urllib3.connection.HTTPSConnection object at 0x000002BD1CB59790>: Failed to establish a new connection: [Errno 11001] getaddrinfo failed')))
2021-07-07 12:34:23.494 | INFO     | __main__:handler:41 - Thread 0 | Proxy in use: http://test.io:123456
2021-07-07 12:34:25.747 | INFO     | __main__:handler:49 - Error! Lets try again! HTTPSConnectionPool(host='whosebug.com', port=443): Max retries exceeded with url: / (Caused by ProxyError('Cannot connect to proxy.', NewConnectionError('<urllib3.connection.HTTPSConnection object at 0x000002BD1CB59940>: Failed to establish a new connection: [Errno 11001] getaddrinfo failed')))
2021-07-07 12:34:25.748 | INFO     | __main__:handler:41 - Thread 3 | Proxy in use: http://test.io:12345
2021-07-07 12:34:28.000 | INFO     | __main__:handler:49 - Error! Lets try again! HTTPSConnectionPool(host='whosebug.com', port=443): Max retries exceeded with url: / (Caused by ProxyError('Cannot connect to proxy.', NewConnectionError('<urllib3.connection.HTTPSConnection object at 0x000002BD1CB59580>: Failed to establish a new connection: [Errno 11001] getaddrinfo failed')))
2021-07-07 12:34:28.000 | INFO     | __main__:handler:41 - Thread 0 | Proxy in use: http://test.io:123456
2021-07-07 12:34:30.253 | INFO     | __main__:handler:49 - Error! Lets try again! HTTPSConnectionPool(host='whosebug.com', port=443): Max retries exceeded with url: / (Caused by ProxyError('Cannot connect to proxy.', NewConnectionError('<urllib3.connection.HTTPSConnection object at 0x000002BD1CB598B0>: Failed to establish a new connection: [Errno 11001] getaddrinfo failed')))
2021-07-07 12:34:30.253 | INFO     | __main__:handler:41 - Thread 1 | Proxy in use: http://test.io:12345
2021-07-07 12:34:32.507 | INFO     | __main__:handler:49 - Error! Lets try again! HTTPSConnectionPool(host='whosebug.com', port=443): Max retries exceeded with url: / (Caused by ProxyError('Cannot connect to proxy.', NewConnectionError('<urllib3.connection.HTTPSConnection object at 0x000002BD1CB59490>: Failed to establish a new connection: [Errno 11001] getaddrinfo failed')))
2021-07-07 12:34:32.507 | INFO     | __main__:handler:41 - Thread 0 | Proxy in use: http://test.io:123456

一般同一个委托书可以连续递交两次

只是为了确保在您的案例中不会连续两次给出相同的代理,修改QueuePut 只是为了调试目的。但是如果你没有得到断言错误,那么将代码恢复到你原来的 class 定义。

class QueuePut(contextlib.AbstractContextManager):
    from threading import Lock
    lock = Lock()
    last_proxy_obtained = None

    def __enter__(self):
        self.proxy = proxy_q.get(block=True)
        with self.lock:
            assert(self.proxy != self.last_proxy_obtained)
            self.last_proxy_obtained = self.proxy
        return self.proxy

    def __exit__(self, excType, excValue, traceback):
        proxy_q.put(self.proxy)

上面的代码假定成功执行self.proxy = proxy_q.get(block=True)的线程不会被挂起,而是能够立即执行下面的with self.lock:语句。如果该假设失败,那么即使程序运行正常,您也可能会遇到断言错误。因此,如果确实遇到断言错误,则应使用以下版本重新运行,该版本一次只允许一个线程请求代理:

class QueuePut(contextlib.AbstractContextManager):
    from threading import Lock
    lock = Lock()
    last_proxy_obtained = None

    def __enter__(self):
        with self.lock:
            self.proxy = proxy_q.get(block=True)
            assert(self.proxy != self.last_proxy_obtained)
            self.last_proxy_obtained = self.proxy
        return self.proxy

    def __exit__(self, excType, excValue, traceback):
        proxy_q.put(self.proxy)

测试程序

然而,在下面的程序中,我不希望同一个代理被连续分发两次:

import contextlib
import queue
from threading import Thread

list_op_proxy = [
    "http://test.io:12345",
    "http://test.io:123456",
]

proxy_q = queue.Queue()

for i in list_op_proxy:
    proxy_q.put(i)


class QueuePut(contextlib.AbstractContextManager):
    from threading import Lock
    lock = Lock()
    last_proxy_obtained = None

    def __enter__(self):
        self.proxy = proxy_q.get(block=True)
        with self.lock:
            assert(self.proxy != self.last_proxy_obtained)
            self.last_proxy_obtained = self.proxy
        return self.proxy

    def __exit__(self, excType, excValue, traceback):
        proxy_q.put(self.proxy)


def handler(name):
    for i in range(100):
        with QueuePut() as proxy:
            print(f"{name} | Proxy in use: {proxy}", flush=True)

for i in range(5):
    Thread(target=handler, args=(f'Thread {i}',)).start()