Python - 多线程解析和重复代理
Python - parsing with multithreads and repeating proxy
描述:我尝试解析大量数据,但当具有相同 IP 地址的两个线程同时工作时,我从服务器收到错误消息。我的代理数量不足以正面解决问题。
问题: 我如何调用线程重复列表中的代理,但检查代理是否繁忙并让空闲的代理工作?
我想要的:我的期望是模块"concurrent.futures.ThreadPoolExecutor"给他一个代理列表,以便他重复它并检查是否忙碌。
我尝试了什么: 现在我填写了整个范围的代理列表 list = list * range // len(list)。我还尝试 select 使用随机选择的代理。
我的代码(制表符插入错误):
def start_threads():
with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
executor.map(get_items_th,range(500),proxy_list)
def get_items_th(begin,proxy):
items=[]
headers={'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_6) AppleWebKit/537.3'}
r = requests.get('https://*.com/?query=&start='+str(begin*100)+'&count=100&search_descriptions=0&sort_column=popular&sort_dir=desc&norender=1', headers=headers,timeout=15000, cookies=cookie, proxies=proxy)
time.sleep(random.uniform(1.5, 2.5)) #it not helping, i'm looking for a better option
if r.status_code!=200:
print('Error code '+str(begin)+': '+str(r.status_code))
else:
page = json.loads(r.content)
if page['results']!=None:
allItems = page['results']
for currItem in allItems:
hash_name=currItem['hash_name'].replace('}','')
app_name=currItem['app_name']
asset_description=currItem['asset_description']
appid=str(asset_description['appid'])
classid=str(asset_description['classid'])
tradable=str(asset_description['tradable'])
if '<body>' not in hash_name:
items.append(classid+'}'+appid+'}'+app_name+'}'+hash_name+'}'+tradable+'\n')
g = open ('items_'+str(begin)+'.csv', 'w',encoding='utf8')
for line in items:
g.write(line)
g.close()
else:
print(r.content+'\n')
附加问题:最大线程数=处理器线程数?我想尽快完成任务。
检查图片:image_file
更新文件:test3.7z - run.py
您可以创建一个可以用作上下文的代理 class,具有已定义的 enter 和 exit方法,然后你可以使用它与"with"语句。
import threading
PROXIES = {
"PROXY1" : "1",
"PROXY2" : "2",
"PROXY3" : "3",
"PROXY4" : "4",
"PROXY5" : "5",
}
class Proxy():
_Proxies = list()
cls_lock = threading.Lock()
def __init__(self,name,proxy):
self.free = True
self.name = name
self.proxy = proxy
self.__class__._Proxies.append(self)
@classmethod
def Get_Free_Proxy(cls):
with cls.cls_lock:
while True:
for proxy in cls._Proxies:
if proxy.free:
proxy.free = False
return proxy
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
self.free = True
for Key,Value in PROXIES.items():
Proxy(Key,Value)
with Proxy.Get_Free_Proxy() as locked_proxy:
print(locked_proxy)
for Proxy in Proxy._Proxies:
print(Proxy.name,Proxy.free)
print()
for Proxy in Proxy._Proxies:
print(Proxy.name,Proxy.free)
这将打印:
PROXY1 False
PROXY2 True
PROXY3 True
PROXY4 True
PROXY5 True
PROXY1 True
PROXY2 True
PROXY3 True
PROXY4 True
PROXY5 True
然后你可以修改你的代码:
def start_threads():
with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
executor.map(get_items_th,range(500))
def get_items_th(begin):
with Proxy.Get_Free_Proxy() as locked_proxy:
items=[]
headers={'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_6) AppleWebKit/537.3'}
r = requests.get('https://*.com/?query=&start='+str(begin*100)+'&count=100&search_descriptions=0&sort_column=popular&sort_dir=desc&norender=1', headers=headers,timeout=15000, cookies=cookie, proxies=locked_proxy)
.
.
.
描述:我尝试解析大量数据,但当具有相同 IP 地址的两个线程同时工作时,我从服务器收到错误消息。我的代理数量不足以正面解决问题。
问题: 我如何调用线程重复列表中的代理,但检查代理是否繁忙并让空闲的代理工作?
我想要的:我的期望是模块"concurrent.futures.ThreadPoolExecutor"给他一个代理列表,以便他重复它并检查是否忙碌。
我尝试了什么: 现在我填写了整个范围的代理列表 list = list * range // len(list)。我还尝试 select 使用随机选择的代理。
我的代码(制表符插入错误):
def start_threads():
with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
executor.map(get_items_th,range(500),proxy_list)
def get_items_th(begin,proxy):
items=[]
headers={'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_6) AppleWebKit/537.3'}
r = requests.get('https://*.com/?query=&start='+str(begin*100)+'&count=100&search_descriptions=0&sort_column=popular&sort_dir=desc&norender=1', headers=headers,timeout=15000, cookies=cookie, proxies=proxy)
time.sleep(random.uniform(1.5, 2.5)) #it not helping, i'm looking for a better option
if r.status_code!=200:
print('Error code '+str(begin)+': '+str(r.status_code))
else:
page = json.loads(r.content)
if page['results']!=None:
allItems = page['results']
for currItem in allItems:
hash_name=currItem['hash_name'].replace('}','')
app_name=currItem['app_name']
asset_description=currItem['asset_description']
appid=str(asset_description['appid'])
classid=str(asset_description['classid'])
tradable=str(asset_description['tradable'])
if '<body>' not in hash_name:
items.append(classid+'}'+appid+'}'+app_name+'}'+hash_name+'}'+tradable+'\n')
g = open ('items_'+str(begin)+'.csv', 'w',encoding='utf8')
for line in items:
g.write(line)
g.close()
else:
print(r.content+'\n')
附加问题:最大线程数=处理器线程数?我想尽快完成任务。
检查图片:image_file
更新文件:test3.7z - run.py
您可以创建一个可以用作上下文的代理 class,具有已定义的 enter 和 exit方法,然后你可以使用它与"with"语句。
import threading
PROXIES = {
"PROXY1" : "1",
"PROXY2" : "2",
"PROXY3" : "3",
"PROXY4" : "4",
"PROXY5" : "5",
}
class Proxy():
_Proxies = list()
cls_lock = threading.Lock()
def __init__(self,name,proxy):
self.free = True
self.name = name
self.proxy = proxy
self.__class__._Proxies.append(self)
@classmethod
def Get_Free_Proxy(cls):
with cls.cls_lock:
while True:
for proxy in cls._Proxies:
if proxy.free:
proxy.free = False
return proxy
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
self.free = True
for Key,Value in PROXIES.items():
Proxy(Key,Value)
with Proxy.Get_Free_Proxy() as locked_proxy:
print(locked_proxy)
for Proxy in Proxy._Proxies:
print(Proxy.name,Proxy.free)
print()
for Proxy in Proxy._Proxies:
print(Proxy.name,Proxy.free)
这将打印:
PROXY1 False
PROXY2 True
PROXY3 True
PROXY4 True
PROXY5 True
PROXY1 True
PROXY2 True
PROXY3 True
PROXY4 True
PROXY5 True
然后你可以修改你的代码:
def start_threads():
with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
executor.map(get_items_th,range(500))
def get_items_th(begin):
with Proxy.Get_Free_Proxy() as locked_proxy:
items=[]
headers={'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_6) AppleWebKit/537.3'}
r = requests.get('https://*.com/?query=&start='+str(begin*100)+'&count=100&search_descriptions=0&sort_column=popular&sort_dir=desc&norender=1', headers=headers,timeout=15000, cookies=cookie, proxies=locked_proxy)
.
.
.