如何分配线程可用的值

How to assign values that are available to threads

我目前正在研究一个 scraper,我试图弄清楚如何分配可用的代理,这意味着如果我使用 5 个线程并且线程 1 使用代理 A,则不应有其他线程能够访问代理 A,应该尝试随机化所有可用的代理池。

import random
import time
from threading import Thread

import requests

list_op_proxy = [
    "http://test.io:12345",
    "http://test.io:123456",
    "http://test.io:1234567",
    "http://test.io:12345678"
]

session = requests.Session()


def handler(name):
    while True:
        try:
            session.proxies = {
                'https': f'http://{random.choice(list_op_proxy)}'
            }
            with session.get("https://whosebug.com"):
                print(f"{name} - Yay request made!")

            time.sleep(random.randint(5, 10))
        except requests.exceptions as err:
            print(f"Error! Lets try again! {err}")
            continue

        except Exceptions as err:
            print(f"Error! Lets debug! {err}")
            raise Exception


for i in range(5):
    Thread(target=handler, args=(f'Thread {i}',)).start()

我想知道如何创建一种方法来使用可用但未在任何线程中使用的代理,并“阻止”代理无法用于其他线程并在完成后释放?

解决此问题的一种方法是仅使用 global 共享列表,其中包含当前活动的代理或 remove 列表中的代理并在请求完成后读取它们完成的。您不必担心列表上的并发访问,因为 CPython 受到 GIL 的影响。

proxy = random.choice(list_op_proxy)
list_op_proxy.remove(proxy)
session.proxies = {
    'https': f'http://{proxy}'
}
# ... do request

list_op_proxy.append(proxy)

您也可以使用队列执行此操作,然后弹出并添加以提高效率。

使用代理队列

另一种选择是在每次查询之前将代理放入 queueget() 代理中,将其从可用代理中删除,并在请求后返回 put()已经完成了。这是上述列表方法的更有效版本。

首先我们需要初始化代理队列。


proxy_q = queue.Queue()
for proxy in proxies:
    proxy_q.put(proxy)

handler 中,我们然后在执行请求之前从队列中获取代理。我们执行请求并将代理放回队列。
我们正在使用 block=True,这样如果当前没有可用的代理,queue 就会阻塞线程。否则,一旦所有代理都在使用中并且应该获取一个新代理,线程将以 queue.Empty 异常终止。

def handler(name):
    global proxy_q
    while True:
        proxy = proxy_q.get(block=True) # we want blocking behaviour
        # ... do request
        proxy_q.put(proxy)
        # ... response handling can be done after proxy put to not
        # block it longer than required
        # do not forget to define a break condition

使用队列和多处理

首先,您将初始化 manager 并将所有数据放入队列,然后初始化另一个结构来收集您的结果(这里我们初始化一个共享列表)。

manager = multiprocessing.Manager()
q = manager.Queue()
for e in entities:
   q.put(e)
print(q.qsize())
results = manager.list()

您初始化抓取过程:

for proxy in proxies:
    processes.append(multiprocessing.Process(
        target=scrape_function,
        args=(q, results, proxy)
        daemon=True))

然后开始每一个

for w in processes:
    w.start()

最后,你 join 每个进程要确保在子进程完成之前主进程不会终止

for w in processes:
    w.join()

scrape_function 中,您只需一次 get 一项并执行请求。默认配置中的 queue 对象在它为空时会引发 queue.Empty 错误,因此我们使用带有中断条件的无限 while 循环来捕获异常。

def scrape_function(q, results, proxy)
    session = requests.Session()
    session.proxies = {
        'https': f'http://{proxy}'
    }
    while True:
        try:
            request_uri = q.get(block=False)
            with session.get("https://whosebug.com"):
                print(f"{name} - Yay request made!")
                results.append(result)
            time.sleep(random.randint(5, 10))
        except queue.Empty:
            break

每个查询的结果都附加到结果列表中,该列表也在不同进程之间共享。