无法 read/write 在 python 中使用多线程处理文件
Can't read/write to files using multithreading in python
我有一个输入文件,其中包含一长串 URL。让我们假设在 mylines.txt
:
https://yahoo.com
https://google.com
https://facebook.com
https://twitter.com
我需要做的是:
从输入文件中读取一行mylines.txt
执行myFun
函数。这将执行一些任务。并产生由一行组成的输出。它在我的真实代码中更复杂。但是在概念上是这样的。
将输出写入 results.txt
文件
因为我的投入很大。我需要利用 python 多线程。我看着这个不错post here。但不幸的是,它假设输入是一个简单的列表,而不是假设我想把函数的输出写在一个文件中。
我需要确保每个输入的输出都写在一行中(即,如果多线程写入同一行会导致我得到不正确的数据)。
我试图乱来。但没有成功。我以前没有使用 python 的多线程,但现在是学习的时候了,因为在我的情况下这是不可避免的。我有一个很长的列表,没有多线程就无法在合理的时间内完成。我的函数不会做这个简单的任务,而是做更多概念上不需要的操作。
这是我的尝试。请纠正我(在代码本身):
import threading
import requests
from multiprocessing.dummy import Pool as ThreadPool
import Queue
def myFunc(url):
response = requests.get(url, verify=False ,timeout=(2, 5))
results = open("myresults","a") # "a" to append results
results.write("url is:",url, ", response is:", response.url)
results.close()
worker_data = open("mylines.txt","r") # open my input file.
#load up a queue with your data, this will handle locking
q = Queue.Queue()
for url in worker_data:
q.put(url)
# make the Pool of workers
pool = ThreadPool(4)
results = pool.map(myFunc, q)
# close the pool and wait for the work to finish
pool.close()
pool.join()
问:如何修复上面的代码(请简明扼要并在代码本身中帮助我)从输入文件中读取一行,执行函数,使用 python 多线程并发执行 requests
所以我可以在合理的时间内完成我的列表。
更新:
根据答案,代码变为:
import threading
import requests
from multiprocessing.dummy import Pool as ThreadPool
import queue
from multiprocessing import Queue
def myFunc(url):
response = requests.get(url, verify=False ,timeout=(2, 5))
return "url is:" + url + ", response is:" + response.url
worker_data = open("mylines.txt","r") # open my input file.
#load up a queue with your data, this will handle locking
q = queue.Queue(4)
with open("mylines.txt","r") as f: # open my input file.
for url in f:
q.put(url)
# make the Pool of workers
pool = ThreadPool(4)
results = pool.map(myFunc, q)
with open("myresults","w") as f:
for line in results:
f.write(line + '\n')
mylines.txt 包含:
https://yahoo.com
https://www.google.com
https://facebook.com
https://twitter.com
请注意,我首先使用的是:
import Queue
并且:
q = Queue.Queue(4)
但出现错误提示:
Traceback (most recent call last):
File "test3.py", line 4, in <module>
import Queue
ModuleNotFoundError: No module named 'Queue'
根据一些搜索我改为:
import queue
并将相关行发送至:
q = queue.Queue(4)
我还补充了:
from multiprocessing import Queue
但没有任何效果。 python 多线程方面的专家可以提供帮助吗?
您应该将函数更改为 return 字符串:
def myFunc(url):
response = requests.get(url, verify=False ,timeout=(2, 5))
return "url is:" + url + ", response is:" + response.url
稍后将这些字符串写入文件:
results = pool.map(myFunc, q)
with open("myresults","w") as f:
for line in results:
f.write(line + '\n')
这使 requests.get
的多线程保持工作状态,但序列化将结果写入输出文件。
更新:
你还应该使用 with
来读取输入文件:
#load up a queue with your data, this will handle locking
q = Queue.Queue()
with open("mylines.txt","r") as f: # open my input file.
for url in f:
q.put(url)
不是让工作池线程打印出结果,这不能保证正确缓冲输出,而是创建一个线程,从第二个 Queue
读取结果并打印它们。
我已经修改了您的解决方案,因此它构建了自己的工作线程池。给队列一个无限长没有什么意义,因为当队列达到最大大小时主线程会阻塞:你只需要它足够长以确保工作线程总是有工作要处理——主线程会阻塞并随着队列大小的增加和减少而解锁。
它还标识了负责输出队列中每个项目的线程,这应该让您对多线程方法的工作有一定的信心,并打印来自服务器的响应代码。我发现我必须从 URL 中删除换行符。
因为现在只有一个线程在写入文件,所以写入总是完全同步的,它们之间没有可能相互干扰。
import threading
import requests
import queue
POOL_SIZE = 4
def myFunc(inq, outq): # worker thread deals only with queues
while True:
url = inq.get() # Blocks until something available
if url is None:
break
response = requests.get(url.strip(), timeout=(2, 5))
outq.put((url, response, threading.currentThread().name))
class Writer(threading.Thread):
def __init__(self, q):
super().__init__()
self.results = open("myresults","a") # "a" to append results
self.queue = q
def run(self):
while True:
url, response, threadname = self.queue.get()
if response is None:
self.results.close()
break
print("****url is:",url, ", response is:", response.status_code, response.url, "thread", threadname, file=self.results)
#load up a queue with your data, this will handle locking
inq = queue.Queue() # could usefully limit queue size here
outq = queue.Queue()
# start the Writer
writer = Writer(outq)
writer.start()
# make the Pool of workers
threads = []
for i in range(POOL_SIZE):
thread = threading.Thread(target=myFunc, name=f"worker{i}", args=(inq, outq))
thread.start()
threads.append(thread)
# push the work onto the queues
with open("mylines.txt","r") as worker_data: # open my input file.
for url in worker_data:
inq.put(url.strip())
for thread in threads:
inq.put(None)
# close the pool and wait for the workers to finish
for thread in threads:
thread.join()
# Terminate the writer
outq.put((None, None, None))
writer.join()
使用 mylines.txt
中给出的数据,我看到以下输出:
****url is: https://www.google.com , response is: 200 https://www.google.com/ thread worker1
****url is: https://twitter.com , response is: 200 https://twitter.com/ thread worker2
****url is: https://facebook.com , response is: 200 https://www.facebook.com/ thread worker0
****url is: https://www.censys.io , response is: 200 https://censys.io/ thread worker1
****url is: https://yahoo.com , response is: 200 https://uk.yahoo.com/?p=us thread worker3
我有一个输入文件,其中包含一长串 URL。让我们假设在 mylines.txt
:
https://yahoo.com
https://google.com
https://facebook.com
https://twitter.com
我需要做的是:
从输入文件中读取一行
mylines.txt
执行
myFun
函数。这将执行一些任务。并产生由一行组成的输出。它在我的真实代码中更复杂。但是在概念上是这样的。将输出写入
results.txt
文件
因为我的投入很大。我需要利用 python 多线程。我看着这个不错post here。但不幸的是,它假设输入是一个简单的列表,而不是假设我想把函数的输出写在一个文件中。
我需要确保每个输入的输出都写在一行中(即,如果多线程写入同一行会导致我得到不正确的数据)。
我试图乱来。但没有成功。我以前没有使用 python 的多线程,但现在是学习的时候了,因为在我的情况下这是不可避免的。我有一个很长的列表,没有多线程就无法在合理的时间内完成。我的函数不会做这个简单的任务,而是做更多概念上不需要的操作。
这是我的尝试。请纠正我(在代码本身):
import threading
import requests
from multiprocessing.dummy import Pool as ThreadPool
import Queue
def myFunc(url):
response = requests.get(url, verify=False ,timeout=(2, 5))
results = open("myresults","a") # "a" to append results
results.write("url is:",url, ", response is:", response.url)
results.close()
worker_data = open("mylines.txt","r") # open my input file.
#load up a queue with your data, this will handle locking
q = Queue.Queue()
for url in worker_data:
q.put(url)
# make the Pool of workers
pool = ThreadPool(4)
results = pool.map(myFunc, q)
# close the pool and wait for the work to finish
pool.close()
pool.join()
问:如何修复上面的代码(请简明扼要并在代码本身中帮助我)从输入文件中读取一行,执行函数,使用 python 多线程并发执行 requests
所以我可以在合理的时间内完成我的列表。
更新:
根据答案,代码变为:
import threading
import requests
from multiprocessing.dummy import Pool as ThreadPool
import queue
from multiprocessing import Queue
def myFunc(url):
response = requests.get(url, verify=False ,timeout=(2, 5))
return "url is:" + url + ", response is:" + response.url
worker_data = open("mylines.txt","r") # open my input file.
#load up a queue with your data, this will handle locking
q = queue.Queue(4)
with open("mylines.txt","r") as f: # open my input file.
for url in f:
q.put(url)
# make the Pool of workers
pool = ThreadPool(4)
results = pool.map(myFunc, q)
with open("myresults","w") as f:
for line in results:
f.write(line + '\n')
mylines.txt 包含:
https://yahoo.com
https://www.google.com
https://facebook.com
https://twitter.com
请注意,我首先使用的是:
import Queue
并且: q = Queue.Queue(4)
但出现错误提示:
Traceback (most recent call last):
File "test3.py", line 4, in <module>
import Queue
ModuleNotFoundError: No module named 'Queue'
根据一些搜索我改为:
import queue
并将相关行发送至: q = queue.Queue(4)
我还补充了:
from multiprocessing import Queue
但没有任何效果。 python 多线程方面的专家可以提供帮助吗?
您应该将函数更改为 return 字符串:
def myFunc(url):
response = requests.get(url, verify=False ,timeout=(2, 5))
return "url is:" + url + ", response is:" + response.url
稍后将这些字符串写入文件:
results = pool.map(myFunc, q)
with open("myresults","w") as f:
for line in results:
f.write(line + '\n')
这使 requests.get
的多线程保持工作状态,但序列化将结果写入输出文件。
更新:
你还应该使用 with
来读取输入文件:
#load up a queue with your data, this will handle locking
q = Queue.Queue()
with open("mylines.txt","r") as f: # open my input file.
for url in f:
q.put(url)
不是让工作池线程打印出结果,这不能保证正确缓冲输出,而是创建一个线程,从第二个 Queue
读取结果并打印它们。
我已经修改了您的解决方案,因此它构建了自己的工作线程池。给队列一个无限长没有什么意义,因为当队列达到最大大小时主线程会阻塞:你只需要它足够长以确保工作线程总是有工作要处理——主线程会阻塞并随着队列大小的增加和减少而解锁。
它还标识了负责输出队列中每个项目的线程,这应该让您对多线程方法的工作有一定的信心,并打印来自服务器的响应代码。我发现我必须从 URL 中删除换行符。
因为现在只有一个线程在写入文件,所以写入总是完全同步的,它们之间没有可能相互干扰。
import threading
import requests
import queue
POOL_SIZE = 4
def myFunc(inq, outq): # worker thread deals only with queues
while True:
url = inq.get() # Blocks until something available
if url is None:
break
response = requests.get(url.strip(), timeout=(2, 5))
outq.put((url, response, threading.currentThread().name))
class Writer(threading.Thread):
def __init__(self, q):
super().__init__()
self.results = open("myresults","a") # "a" to append results
self.queue = q
def run(self):
while True:
url, response, threadname = self.queue.get()
if response is None:
self.results.close()
break
print("****url is:",url, ", response is:", response.status_code, response.url, "thread", threadname, file=self.results)
#load up a queue with your data, this will handle locking
inq = queue.Queue() # could usefully limit queue size here
outq = queue.Queue()
# start the Writer
writer = Writer(outq)
writer.start()
# make the Pool of workers
threads = []
for i in range(POOL_SIZE):
thread = threading.Thread(target=myFunc, name=f"worker{i}", args=(inq, outq))
thread.start()
threads.append(thread)
# push the work onto the queues
with open("mylines.txt","r") as worker_data: # open my input file.
for url in worker_data:
inq.put(url.strip())
for thread in threads:
inq.put(None)
# close the pool and wait for the workers to finish
for thread in threads:
thread.join()
# Terminate the writer
outq.put((None, None, None))
writer.join()
使用 mylines.txt
中给出的数据,我看到以下输出:
****url is: https://www.google.com , response is: 200 https://www.google.com/ thread worker1
****url is: https://twitter.com , response is: 200 https://twitter.com/ thread worker2
****url is: https://facebook.com , response is: 200 https://www.facebook.com/ thread worker0
****url is: https://www.censys.io , response is: 200 https://censys.io/ thread worker1
****url is: https://yahoo.com , response is: 200 https://uk.yahoo.com/?p=us thread worker3