Python - 多线程解析和重复代理

Python - parsing with multithreads and repeating proxy

描述:我尝试解析大量数据,但当具有相同 IP 地址的两个线程同时工作时,我从服务器收到错误消息。我的代理数量不足以正面解决问题。

问题: 我如何调用线程重复列表中的代理,但检查代理是否繁忙并让空闲的代理工作?

我想要的:我的期望是模块"concurrent.futures.ThreadPoolExecutor"给他一个代理列表,以便他重复它并检查是否忙碌。

我尝试了什么: 现在我填写了整个范围的代理列表 list = list * range // len(list)。我还尝试 select 使用随机选择的代理。

我的代码(制表符插入错误):

def start_threads(): with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor: executor.map(get_items_th,range(500),proxy_list)

def get_items_th(begin,proxy):
items=[]
headers={'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_6) AppleWebKit/537.3'}
r = requests.get('https://*.com/?query=&start='+str(begin*100)+'&count=100&search_descriptions=0&sort_column=popular&sort_dir=desc&norender=1', headers=headers,timeout=15000, cookies=cookie, proxies=proxy)
time.sleep(random.uniform(1.5, 2.5)) #it not helping, i'm looking for a better option

if r.status_code!=200:
    print('Error code '+str(begin)+': '+str(r.status_code))
else:

    page = json.loads(r.content)

    if page['results']!=None:
        allItems = page['results']

        for currItem in allItems: 
            hash_name=currItem['hash_name'].replace('}','')
            app_name=currItem['app_name']
            asset_description=currItem['asset_description']
            appid=str(asset_description['appid'])
            classid=str(asset_description['classid'])
            tradable=str(asset_description['tradable'])

            if '<body>' not in hash_name:
                items.append(classid+'}'+appid+'}'+app_name+'}'+hash_name+'}'+tradable+'\n')

        g = open ('items_'+str(begin)+'.csv', 'w',encoding='utf8')
        for line in items:
            g.write(line)
        g.close()
    else:
        print(r.content+'\n')

附加问题:最大线程数=处理器线程数?我想尽快完成任务。

检查图片:image_file

更新文件:test3.7z - run.py

您可以创建一个可以用作上下文的代理 class,具有已定义的 enterexit方法,然后你可以使用它与"with"语句。

import threading

PROXIES = {
    "PROXY1" : "1",
    "PROXY2" : "2",
    "PROXY3" : "3",
    "PROXY4" : "4",
    "PROXY5" : "5",
}

class Proxy():
    _Proxies = list()
    cls_lock = threading.Lock()

    def __init__(self,name,proxy):
        self.free = True
        self.name = name 
        self.proxy = proxy
        self.__class__._Proxies.append(self)

    @classmethod
    def Get_Free_Proxy(cls):
        with cls.cls_lock:
            while True:
                for proxy in cls._Proxies:
                    if proxy.free:
                        proxy.free = False
                        return proxy

    def __enter__(self):            
        return self

    def __exit__(self, type, value, traceback):
        self.free = True

for Key,Value in PROXIES.items():
    Proxy(Key,Value)

with Proxy.Get_Free_Proxy() as locked_proxy:
    print(locked_proxy)
    for Proxy in Proxy._Proxies:
        print(Proxy.name,Proxy.free)

print()

for Proxy in Proxy._Proxies:
        print(Proxy.name,Proxy.free)

这将打印:

PROXY1 False
PROXY2 True
PROXY3 True
PROXY4 True
PROXY5 True

PROXY1 True
PROXY2 True
PROXY3 True
PROXY4 True
PROXY5 True

然后你可以修改你的代码:

def start_threads():
    with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
        executor.map(get_items_th,range(500))

def get_items_th(begin):
    with Proxy.Get_Free_Proxy() as locked_proxy:
        items=[]
        headers={'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_6) AppleWebKit/537.3'}
        r = requests.get('https://*.com/?query=&start='+str(begin*100)+'&count=100&search_descriptions=0&sort_column=popular&sort_dir=desc&norender=1', headers=headers,timeout=15000, cookies=cookie, proxies=locked_proxy)
        .
        .
        .