如何使用 threadpool.executor 在线程之间共享计数器变量并递增它?
How to share counter variable among threads using threadpool.executor and increment it?
下面是我在python3.x
中实现的一个线程池执行器
with ThreadPoolExecutor(max_workers=15) as ex:
f = open(filename, 'r', encoding='UTF-8')
results = {ex.submit(callreadline, files ): files for files in f.readlines() }
results 变量包含以下格式的值:
words and their corresponding 200 dimensional embedding
您可以看到这些值是元组。第一个值是一个单词,第二个值是 200 维数组。值的数量总共是 400000。所以有400000个元组。
现在我要做的是创建另一个执行以下任务的线程池执行器
- 创建元组列表中第一个值的有序字典。这意味着前 4 个元组值的 say 词是 the,is,are,said。然后有序字典将包含:
{the:0,is:1,are:2,said:3,...…………….hello:399999}
- 创建一个 numpy nd 数组,其中包含有序字典中相应单词的 200 维数组(通过相应单词,我的意思是第一个条目将是单词 the[=47= 的 200 维数组],那么 的 200 维数组是 ...等等)。所以 numpy nd 数组的维度是 400000 * 200.
我在下面的代码中使用 for 循环
count = 0
word_to_idx = OrderedDict()
vectors = []
for future in results.result:
b = future.result()
word_to_idx[count] = b[0]
if(count == 0):
vectors = np.array([b[1]])
else:
vectors = np.append(vectors,np.array([b[1]]),axis=0)
count = count +1
在上述函数的末尾,我返回了 word_to_idx 和完成工作的向量。但是循环400000个元组,一个一个赋值给变量,耗时极长(10小时左右)。
所以我在想是否有一种方法可以同时使用线程池执行程序来并行化此功能。
我正在考虑创建线程,然后与每个线程共享一个计数器变量,一次访问一个共享变量。然后该线程将递增该变量,然后另一个线程将访问递增的计数器。有人能给我指出正确的方向吗?
编辑:
这里是 call readline 函数,它在 15 个 worker 中调用时运行得非常快:
def callreadline(line):
# word_to_idx = OrderedDict()
word_to_idx = OrderedDict()
vectors = []
vocabulary = None
word_to_idx = read_w2v_word(line.split(' ')[0])
try:
vectors = np.append(vectors, [np.array(line.split(' ')[1:])], axis=0)
except:
vectors = np.array(line.split(' ')[1:],dtype=float)
if vocabulary is not None:
word_to_idx, vectors = filter_words(word_to_idx, vectors, vocabulary)
return word_to_idx,vectors
我感觉 callreadline 函数甚至还没有尽可能快,但这不是问题的一部分,所以让我尝试为您解决其余问题:
with ThreadPoolExecutor(max_workers=15) as ex:
f = open(filename, 'r', encoding='UTF-8')
results = [ex.submit(callreadline, files) for files in f.readlines()]
word_to_idx = dict()
vectors = []
for count, future in enumerate(results):
b = future.result()
word_to_idx[b[0]] = count
vectors.append(b[1])
vectors = np.array(vectors)
下面是我在python3.x
中实现的一个线程池执行器 with ThreadPoolExecutor(max_workers=15) as ex:
f = open(filename, 'r', encoding='UTF-8')
results = {ex.submit(callreadline, files ): files for files in f.readlines() }
results 变量包含以下格式的值:
words and their corresponding 200 dimensional embedding
您可以看到这些值是元组。第一个值是一个单词,第二个值是 200 维数组。值的数量总共是 400000。所以有400000个元组。
现在我要做的是创建另一个执行以下任务的线程池执行器
- 创建元组列表中第一个值的有序字典。这意味着前 4 个元组值的 say 词是 the,is,are,said。然后有序字典将包含:
{the:0,is:1,are:2,said:3,...…………….hello:399999}
- 创建一个 numpy nd 数组,其中包含有序字典中相应单词的 200 维数组(通过相应单词,我的意思是第一个条目将是单词 the[=47= 的 200 维数组],那么 的 200 维数组是 ...等等)。所以 numpy nd 数组的维度是 400000 * 200.
我在下面的代码中使用 for 循环
count = 0
word_to_idx = OrderedDict()
vectors = []
for future in results.result:
b = future.result()
word_to_idx[count] = b[0]
if(count == 0):
vectors = np.array([b[1]])
else:
vectors = np.append(vectors,np.array([b[1]]),axis=0)
count = count +1
在上述函数的末尾,我返回了 word_to_idx 和完成工作的向量。但是循环400000个元组,一个一个赋值给变量,耗时极长(10小时左右)。
所以我在想是否有一种方法可以同时使用线程池执行程序来并行化此功能。
我正在考虑创建线程,然后与每个线程共享一个计数器变量,一次访问一个共享变量。然后该线程将递增该变量,然后另一个线程将访问递增的计数器。有人能给我指出正确的方向吗?
编辑:
这里是 call readline 函数,它在 15 个 worker 中调用时运行得非常快:
def callreadline(line):
# word_to_idx = OrderedDict()
word_to_idx = OrderedDict()
vectors = []
vocabulary = None
word_to_idx = read_w2v_word(line.split(' ')[0])
try:
vectors = np.append(vectors, [np.array(line.split(' ')[1:])], axis=0)
except:
vectors = np.array(line.split(' ')[1:],dtype=float)
if vocabulary is not None:
word_to_idx, vectors = filter_words(word_to_idx, vectors, vocabulary)
return word_to_idx,vectors
我感觉 callreadline 函数甚至还没有尽可能快,但这不是问题的一部分,所以让我尝试为您解决其余问题:
with ThreadPoolExecutor(max_workers=15) as ex:
f = open(filename, 'r', encoding='UTF-8')
results = [ex.submit(callreadline, files) for files in f.readlines()]
word_to_idx = dict()
vectors = []
for count, future in enumerate(results):
b = future.result()
word_to_idx[b[0]] = count
vectors.append(b[1])
vectors = np.array(vectors)