多处理大 CSV 没有 return 预期的行数
Multiprocessing big CSV's does not return expected amount of rows
我正在尝试帮助某人解决某事。我绝不是专业程序员,但我想做的是根据年份和来自另一个 CSV 的 ID 从一个 CSV 计算一个值。如果我出于时间和测试目的静态放置较小的样本量(amount_of_reviews 使用 180mb CSV),该程序将按我的预期运行。但是当我想让它处理所有数据时,我似乎从预期的 20245 个结果中丢失了大约 2000 个(其中一个线程可能失败了?)。我正在使用多处理将程序花费的时间减少到 运行。我将继续 post 我所有的代码,我希望有经验的人可以发现我的错误。
import csv
import os
from multiprocessing import Process, Lock, Array, Value
import datetime
from ctypes import c_char_p
print (datetime.datetime.now())
with open('D:/temp/listings.csv', encoding="utf8") as f:
reader = csv.reader(f)
f.seek(0)
idSet = set()
for row in reader:
idSet.add(row[0])
idList = list(idSet)
idList = sorted(idList)
listings = []
def amount_of_reviews_2019(id):
total = 0
with open('D:/temp/reviews.csv', encoding="utf8") as f:
reader = csv.reader(f)
f.seek(0)
next(reader)
for row in reader:
if int(row[2][:4]) >= 2019 and row[0] == id:
total = total + 1
return total
def calc(id):
with open('D:/temp/listings.csv', encoding="utf8") as f:
reader = csv.reader(f)
f.seek(1)
listing = []
for row in reader:
if row[0] == id:
listing.append(row[0])
listing.append(row[48])
listing.append(row[49])
listing.append(amount_of_reviews_2019(id))
listings.append(listing)
print(len(listings))
def format_csv(data, lock):
with lock:
with open('D:/temp/multiprocessing.csv', 'a+', newline='', encoding="utf8") as csvfile:
filewriter = csv.writer(csvfile, delimiter=',', quotechar='|', quoting=csv.QUOTE_MINIMAL)
print(data)
filewriter.writerows(data)
#for y in data:
#filewriter.writerow([y[0], y[1], y[2], y[3]])
def do(counter, lock):
for id in idList:
if counter.value < len(idList): #len(idList) = 20245 #When i put lets say 15 here I get all 15 expected results
with counter.get_lock():
counter.value += 1 #I am aware I skip the 0 index here
print(counter.value)
calc(idList[counter.value])
else:
format_csv(listings, lock)
break
if __name__ == '__main__':
lock = Lock()
print(len(idList))
sharedCounter = Value('i', 0)
processes = []
for i in range(os.cpu_count()):
print('registering process %d' % i)
processes.append(Process(target=do, args=(sharedCounter, lock)))
for process in processes:
process.start()
for process in processes:
process.join()
print (datetime.datetime.now())
这段代码看起来有竞争条件:
with counter.get_lock():
counter.value += 1 #I am aware I skip the 0 index here
print(counter.value)
calc(idList[counter.value])
你在锁定 counter
的同时递增它,没问题。但是,然后在 idList[counter.value]
中查询 锁外 计数器的值。因此,另一个 thread/process 可能同时更改了计数器。在这种情况下,您将从计数器中读取到意外值。编写代码的安全方法是:
value = None
with counter.get_lock():
counter.value += 1 #I am aware I skip the 0 index here
value = counter
print(value)
calc(idList[value])
EDIT 这是您的代码版本,已删除所有竞争条件(我相信)并且还删除了文件 I/O。它对我来说工作正常。也许你可以把文件 I/O 一段一段地加回来,看看哪里出了问题
import csv
import os
from multiprocessing import Process, Lock, Array, Value
import datetime
print (datetime.datetime.now())
idSet = set(range(20245))
idList = list(idSet)
idList = sorted(idList)
listings = []
totalCounter = Value('i', 0)
def calc(id):
listing = []
listings.append(listing)
def format_csv(data, lock):
with lock:
totalCounter.value += len(data)
def do(counter, lock):
for id in idList:
value = None
with counter.get_lock():
if counter.value < len(idList):
value = counter.value
counter.value += 1
if value is not None:
calc(idList[value])
else:
format_csv(listings, lock)
break
if __name__ == '__main__':
lock = Lock()
sharedCounter = Value('i', 0)
processes = []
for i in range(os.cpu_count()):
processes.append(Process(target=do, args=(sharedCounter, lock)))
for process in processes:
process.start()
for process in processes:
process.join()
print (datetime.datetime.now())
print('len(idList): %d, total: %d' % (len(idList), totalCounter.value))
我建议使用 pandas 来阅读文件(感谢 Alexander)。然后遍历列表并对具有该特定 id 且在 2019 年之后的所有评论求和:
import numpy as np
import pandas
import datetime
import time
listing_csv_filename = r'listings.csv'
reviews_csv_filename = r'reviews.csv'
start = time.time()
df_listing = pandas.read_csv(listing_csv_filename, delimiter=',', quotechar='"')
df_reviews = pandas.read_csv(reviews_csv_filename, delimiter=',', parse_dates=[1])
values = list()
valid_year = df_reviews['date'] > datetime.datetime(2019, 1, 1, 0, 0, 0)
for id_num in df_listing['id']:
valid = (df_reviews['listing_id'] == id_num) & valid_year
values.append((id_num, np.sum(valid)))
print(values)
print(time.time() - start)
诊断
在没有深入研究的情况下,我想说这里有两个罪魁祸首,而且它们是齐头并进的:
首先是重复的文件解析和迭代。您迭代“主循环”中的每个 ID,因此 20,025 次。对于每个 ID,您然后读取并遍历 entire 列表文件(20,051 行)和 entire 评论文件(493,816 行)。这加起来读取了 100 亿 2.9 亿 18.6 万 675 行 CSV。
其次,是多处理本身。我没有深入研究它,但我认为可以公平地说,我们仅从代码中就可以很好地了解问题所在。正如我们在上面看到的,对于每个 ID,您的程序都会打开两个 CSV 文件。有一堆进程都需要写入相同的两个文件,总共 20,000 次,这对性能不利。如果代码 运行 没有多处理比有多处理更快,我不会感到完全惊讶。还有 Daniel Junglas 提到的潜在竞争条件。
解决方案
1.
好吧,还是一团糟,但我只是想在世纪之交之前弄点东西出来。我会继续寻找更好的解决方案。根据出现在评论中但未出现在 listings.csv
中的列表数量,理想的解决方案可能会略有不同。
import numpy as np
import pandas as pd
listings_df = pd.read_csv('../resources/listings.csv', header=0, usecols=['id'], dtype={'id': str})
reviews_df = pd.read_csv('../resources/reviews.csv', header=0, parse_dates=['date'], dtype={'listing_id': str})
valid_reviews = reviews_df[reviews_df['date'] >= pd.Timestamp(year=2019, month=1, day=1)]
review_id_counts = valid_reviews['listing_id'].value_counts()
counts_res: pd.DataFrame = pd.merge(listings_df, review_id_counts, left_on='id', right_index=True, how='left').rename(columns={'listing_id': 'review_count'})
counts_res['review_count'] = counts_res['review_count'].fillna(0).astype(np.int64)
counts_res.to_csv(path_or_buf='../out/listing_review_counts.csv', index=False)
运行时间约为 1 秒,这意味着我确实超过了 5 秒或更短的目标。耶:)
2.
此方法使用字典来统计评论,以及标准的csv模块。请记住,如果评论针对的是不在 listings.csv
.
中的列表,则会引发错误
import csv
import datetime
with open('../resources/listings.csv') as listings_file:
reader = csv.DictReader(listings_file)
listing_review_counts = dict.fromkeys((row['id'] for row in reader), 0)
cutoff_date = datetime.date(2019, 1, 1)
with open('../resources/reviews.csv') as reviews_file:
reader = csv.DictReader(reviews_file)
for row in reader:
rev_date = datetime.datetime.fromisoformat(row['date']).date()
if rev_date >= cutoff_date:
listing_review_counts[row['listing_id']] += 1
with open('../out/listing_review_counts_2.csv', 'w', newline='') as out_file:
writer = csv.writer(out_file)
writer.writerow(('id', 'review_count'))
writer.writerows(listing_review_counts.items())
3.
此方法使用 collections.Counter
和标准 csv 模块。
import collections as colls
import csv
import datetime
cutoff_date = datetime.date(2019, 1, 1)
with open('../resources/reviews.csv') as reviews_file:
reader = csv.DictReader(reviews_file)
review_listing_counts = colls.Counter(
(row['listing_id'] for row in reader if datetime.datetime.fromisoformat(row['date']).date() >= cutoff_date))
with open('../resources/listings.csv') as listings_file, open('../out/listing_review_counts_3.csv', 'w',
newline='') as out_file:
reader = csv.DictReader(listings_file)
listings_ids = (row['id'] for row in reader)
writer = csv.writer(out_file)
writer.writerow(('id', 'review_count'))
writer.writerows(((curr_id, review_listing_counts[curr_id]) for curr_id in listings_ids))
如果您有任何问题,请告诉我,我是否应该包括一些解释等。:)
我正在尝试帮助某人解决某事。我绝不是专业程序员,但我想做的是根据年份和来自另一个 CSV 的 ID 从一个 CSV 计算一个值。如果我出于时间和测试目的静态放置较小的样本量(amount_of_reviews 使用 180mb CSV),该程序将按我的预期运行。但是当我想让它处理所有数据时,我似乎从预期的 20245 个结果中丢失了大约 2000 个(其中一个线程可能失败了?)。我正在使用多处理将程序花费的时间减少到 运行。我将继续 post 我所有的代码,我希望有经验的人可以发现我的错误。
import csv
import os
from multiprocessing import Process, Lock, Array, Value
import datetime
from ctypes import c_char_p
print (datetime.datetime.now())
with open('D:/temp/listings.csv', encoding="utf8") as f:
reader = csv.reader(f)
f.seek(0)
idSet = set()
for row in reader:
idSet.add(row[0])
idList = list(idSet)
idList = sorted(idList)
listings = []
def amount_of_reviews_2019(id):
total = 0
with open('D:/temp/reviews.csv', encoding="utf8") as f:
reader = csv.reader(f)
f.seek(0)
next(reader)
for row in reader:
if int(row[2][:4]) >= 2019 and row[0] == id:
total = total + 1
return total
def calc(id):
with open('D:/temp/listings.csv', encoding="utf8") as f:
reader = csv.reader(f)
f.seek(1)
listing = []
for row in reader:
if row[0] == id:
listing.append(row[0])
listing.append(row[48])
listing.append(row[49])
listing.append(amount_of_reviews_2019(id))
listings.append(listing)
print(len(listings))
def format_csv(data, lock):
with lock:
with open('D:/temp/multiprocessing.csv', 'a+', newline='', encoding="utf8") as csvfile:
filewriter = csv.writer(csvfile, delimiter=',', quotechar='|', quoting=csv.QUOTE_MINIMAL)
print(data)
filewriter.writerows(data)
#for y in data:
#filewriter.writerow([y[0], y[1], y[2], y[3]])
def do(counter, lock):
for id in idList:
if counter.value < len(idList): #len(idList) = 20245 #When i put lets say 15 here I get all 15 expected results
with counter.get_lock():
counter.value += 1 #I am aware I skip the 0 index here
print(counter.value)
calc(idList[counter.value])
else:
format_csv(listings, lock)
break
if __name__ == '__main__':
lock = Lock()
print(len(idList))
sharedCounter = Value('i', 0)
processes = []
for i in range(os.cpu_count()):
print('registering process %d' % i)
processes.append(Process(target=do, args=(sharedCounter, lock)))
for process in processes:
process.start()
for process in processes:
process.join()
print (datetime.datetime.now())
这段代码看起来有竞争条件:
with counter.get_lock():
counter.value += 1 #I am aware I skip the 0 index here
print(counter.value)
calc(idList[counter.value])
你在锁定 counter
的同时递增它,没问题。但是,然后在 idList[counter.value]
中查询 锁外 计数器的值。因此,另一个 thread/process 可能同时更改了计数器。在这种情况下,您将从计数器中读取到意外值。编写代码的安全方法是:
value = None
with counter.get_lock():
counter.value += 1 #I am aware I skip the 0 index here
value = counter
print(value)
calc(idList[value])
EDIT 这是您的代码版本,已删除所有竞争条件(我相信)并且还删除了文件 I/O。它对我来说工作正常。也许你可以把文件 I/O 一段一段地加回来,看看哪里出了问题
import csv
import os
from multiprocessing import Process, Lock, Array, Value
import datetime
print (datetime.datetime.now())
idSet = set(range(20245))
idList = list(idSet)
idList = sorted(idList)
listings = []
totalCounter = Value('i', 0)
def calc(id):
listing = []
listings.append(listing)
def format_csv(data, lock):
with lock:
totalCounter.value += len(data)
def do(counter, lock):
for id in idList:
value = None
with counter.get_lock():
if counter.value < len(idList):
value = counter.value
counter.value += 1
if value is not None:
calc(idList[value])
else:
format_csv(listings, lock)
break
if __name__ == '__main__':
lock = Lock()
sharedCounter = Value('i', 0)
processes = []
for i in range(os.cpu_count()):
processes.append(Process(target=do, args=(sharedCounter, lock)))
for process in processes:
process.start()
for process in processes:
process.join()
print (datetime.datetime.now())
print('len(idList): %d, total: %d' % (len(idList), totalCounter.value))
我建议使用 pandas 来阅读文件(感谢 Alexander)。然后遍历列表并对具有该特定 id 且在 2019 年之后的所有评论求和:
import numpy as np
import pandas
import datetime
import time
listing_csv_filename = r'listings.csv'
reviews_csv_filename = r'reviews.csv'
start = time.time()
df_listing = pandas.read_csv(listing_csv_filename, delimiter=',', quotechar='"')
df_reviews = pandas.read_csv(reviews_csv_filename, delimiter=',', parse_dates=[1])
values = list()
valid_year = df_reviews['date'] > datetime.datetime(2019, 1, 1, 0, 0, 0)
for id_num in df_listing['id']:
valid = (df_reviews['listing_id'] == id_num) & valid_year
values.append((id_num, np.sum(valid)))
print(values)
print(time.time() - start)
诊断
在没有深入研究的情况下,我想说这里有两个罪魁祸首,而且它们是齐头并进的:
首先是重复的文件解析和迭代。您迭代“主循环”中的每个 ID,因此 20,025 次。对于每个 ID,您然后读取并遍历 entire 列表文件(20,051 行)和 entire 评论文件(493,816 行)。这加起来读取了 100 亿 2.9 亿 18.6 万 675 行 CSV。
其次,是多处理本身。我没有深入研究它,但我认为可以公平地说,我们仅从代码中就可以很好地了解问题所在。正如我们在上面看到的,对于每个 ID,您的程序都会打开两个 CSV 文件。有一堆进程都需要写入相同的两个文件,总共 20,000 次,这对性能不利。如果代码 运行 没有多处理比有多处理更快,我不会感到完全惊讶。还有 Daniel Junglas 提到的潜在竞争条件。
解决方案
1.
好吧,还是一团糟,但我只是想在世纪之交之前弄点东西出来。我会继续寻找更好的解决方案。根据出现在评论中但未出现在 listings.csv
中的列表数量,理想的解决方案可能会略有不同。
import numpy as np
import pandas as pd
listings_df = pd.read_csv('../resources/listings.csv', header=0, usecols=['id'], dtype={'id': str})
reviews_df = pd.read_csv('../resources/reviews.csv', header=0, parse_dates=['date'], dtype={'listing_id': str})
valid_reviews = reviews_df[reviews_df['date'] >= pd.Timestamp(year=2019, month=1, day=1)]
review_id_counts = valid_reviews['listing_id'].value_counts()
counts_res: pd.DataFrame = pd.merge(listings_df, review_id_counts, left_on='id', right_index=True, how='left').rename(columns={'listing_id': 'review_count'})
counts_res['review_count'] = counts_res['review_count'].fillna(0).astype(np.int64)
counts_res.to_csv(path_or_buf='../out/listing_review_counts.csv', index=False)
运行时间约为 1 秒,这意味着我确实超过了 5 秒或更短的目标。耶:)
2.
此方法使用字典来统计评论,以及标准的csv模块。请记住,如果评论针对的是不在 listings.csv
.
import csv
import datetime
with open('../resources/listings.csv') as listings_file:
reader = csv.DictReader(listings_file)
listing_review_counts = dict.fromkeys((row['id'] for row in reader), 0)
cutoff_date = datetime.date(2019, 1, 1)
with open('../resources/reviews.csv') as reviews_file:
reader = csv.DictReader(reviews_file)
for row in reader:
rev_date = datetime.datetime.fromisoformat(row['date']).date()
if rev_date >= cutoff_date:
listing_review_counts[row['listing_id']] += 1
with open('../out/listing_review_counts_2.csv', 'w', newline='') as out_file:
writer = csv.writer(out_file)
writer.writerow(('id', 'review_count'))
writer.writerows(listing_review_counts.items())
3.
此方法使用 collections.Counter
和标准 csv 模块。
import collections as colls
import csv
import datetime
cutoff_date = datetime.date(2019, 1, 1)
with open('../resources/reviews.csv') as reviews_file:
reader = csv.DictReader(reviews_file)
review_listing_counts = colls.Counter(
(row['listing_id'] for row in reader if datetime.datetime.fromisoformat(row['date']).date() >= cutoff_date))
with open('../resources/listings.csv') as listings_file, open('../out/listing_review_counts_3.csv', 'w',
newline='') as out_file:
reader = csv.DictReader(listings_file)
listings_ids = (row['id'] for row in reader)
writer = csv.writer(out_file)
writer.writerow(('id', 'review_count'))
writer.writerows(((curr_id, review_listing_counts[curr_id]) for curr_id in listings_ids))
如果您有任何问题,请告诉我,我是否应该包括一些解释等。:)