在 Python 中并行读取和过滤文件
Reading and filtering file in parallel in Python
我有一个简单的脚本,它首先读取 CSV table(95MB,672343 行)并从中创建 5 个列表(字符、类型、名称、开始、结束)。然后它打开另一个文件(37MB,795516 行),读取每一行并将其与 进行比较,如果一切正常 - 将字符串写入输出文件。这需要很多时间。
for line in inputFile:
line = line[0:len(line) - 1]
arr = line.split('\t')
for i in xrange(len(chrs)):
if (arr[0]==chrs[i]) and (int(arr[1])>=int(starts[i])) and (int(arr[1])<=int(ends[i])):
outputFile.write(chrs[i]+ '\t' + types[i] + '\t' + starts[i] + '\t' + ends[i] + '\t' + names[i] + '\t' + line + '\n');
我想知道是否可以 运行 并行执行此操作,我可以访问服务器,在那里我可以一次 运行 最多 10 个进程。
问题是,你迭代了 672343 * 795516 = 534'859'613'988 次,很多。您需要更智能的解决方案。
所以我们发现问题是我们查看了太多数据,我们需要改变它。做到这一点的一种方法是尝试变得聪明。也许创建一个字典,其中键对应于 chr
所以我们只需要检查这些条目。但是我们还没有处理 start
和 end
。也许也有一种聪明的方法......
这看起来很像一个数据库。所以如果它是一个数据库,也许我们应该把它当作一个。 Python 包含 sqlite3。
这是一个解决方案,但还有无数其他的可能性。
import sqlite3
import csv
# create an in-memory database
conn = sqlite3.connect(":memory:")
# create the tables
c = conn.cursor()
c.execute("""CREATE TABLE t1 (
chr TEXT,
type TEXT,
name TEXT,
start INTEGER,
end INTEGER
);""")
# if you only have a few columns, just name them all,
# if you have a lot, maybe just put everything in one
# column as a string
c.execute("""CREATE TABLE t2 (
chr TEXT,
num INTEGER,
col3,
col4
);""")
# create indices on the columns we use for selecting
c.execute("""CREATE INDEX i1 ON t1 (chr, start, end);""")
c.execute("""CREATE INDEX i2 ON t2 (chr, num);""")
# fill the tables
with open("comparison_file.csv", 'rb') as f:
reader = csv.reader(f)
# sqlite takes care of converting the number-strings to numbers
c.executemany("INSERT INTO t1 VALUES (?, ?, ?, ?, ?)", reader)
with open("input.csv", 'rb') as f:
reader = csv.reader(f)
# sqlite takes care of converting the number-strings to numbers
c.executemany("INSERT INTO t2 VALUES (?, ?, ?, ?)", reader)
# now let sqlite do its magic and select the correct lines
c.execute("""SELECT t2.*, t1.* FROM t1
JOIN t2 ON t1.chr == t2.chr
WHERE t2.num BETWEEN t1.start AND t1.end;""")
# write result to disk
with open("output.csv", "wb") as f:
writer = csv.writer(f)
for row in c:
writer.writerow(row)
Python 编码技巧
这是我编写您的原始代码的方式。
import csv
# used to be chrs[], type[], name[], start[], end[]
comparisons = []
with open("comparison_file.csv", 'rb') as f:
reader = csv.reader(f)
for chr, type, name, start, end in reader:
comparisons.append([chr, type, name, int(start), int(end)])
with open("output.csv", 'wb') as out_file, \
open("input.csv", 'rb') as in_file:
writer = csv.writer(out_file)
reader = csv.reader(in_file)
for line in reader:
for comp in comparisons:
chr, _, _, end, start = *comp
if line[0] == chr and \
int(line[1]) >= start and \
int(line[2]) >= end:
writer.writerow(comp + line)
备注1
line = line[0:len(line) - 1]
可以写成
line = line[:-1]
备注2
而不是
my_list = [1,2,3]
for i in xrange(len(my_list)):
# do something with my_list[i]
你应该做的:
my_list = [1,2,3]
for item in my_list:
# do something with item
如果需要索引,结合enumerate()
即可。
我能想到一些可能会加快进度的事情,首先是重新排列第一个文件中的数据。
与其将其变成 5 个单独的 list
,不如将其变成 dict
个 list
个 tuple
个 chr
值键:
import csv
import collections
import bisect
# Use a defaultdict so we don't have to worry about whether a chr already exists
foobars = collections.defaultdict(list)
with open('file1.csv', 'rb') as csvfile:
rdr = csv.reader(csvfile)
for (chrs, typ, name, start, end) in rdr:
foobars[chrs].append((int(start), int(end), typ, name))
然后对 foobars
中的每个列表进行排序(您显然应该将其重命名为适合您的任务的名称),这将首先按 start
值排序,因为我们将其放在元组中的第一位:
for lst in foobars.values():
lst.sort()
现在处理您的第二个文件:
for line in inputFile:
line = line.rstrip('\n')
arr = line.split('\t')
arr1int = int(arr[1])
# Since we rearranged our data, we only have to check one of our sublists
search = foobars[arr[0]]
# We use bisect to quickly find the first item where the start value
# is higher than arr[1]
highest = bisect.bisect(search, (arr1int + 1,))
# Now we have a much smaller number of records to check, and we've
# already ensured that chr is a match, and arr[1] >= start
for (start, end, typ, name) in search[:highest]:
if arr1int <= end:
outputFile.write('\t'.join((arr[0], typ, str(start), str(end), name, line)) + '\n')
bisect.bisect()
行值得额外解释一下。如果您有一个排序的值序列,bisect
可用于查找将新值插入到序列中的位置。我们在这里使用它来找到列表中 start
大于我们的 arr[1]
的第一个值(花点时间想想这些概念是如何相关的)。 odd-looking (arr1int + 1,)
值只是确保我们包含所有 start == arr[1]
的条目并将其转换为元组,以便我们比较相似的值。
这几乎肯定会提高代码的性能。到底有多少我真的没有资格说。
没有输入数据我无法真正测试这段代码,所以几乎肯定会有小错误。希望它们很容易修复。
我有一个简单的脚本,它首先读取 CSV table(95MB,672343 行)并从中创建 5 个列表(字符、类型、名称、开始、结束)。然后它打开另一个文件(37MB,795516 行),读取每一行并将其与 进行比较,如果一切正常 - 将字符串写入输出文件。这需要很多时间。
for line in inputFile:
line = line[0:len(line) - 1]
arr = line.split('\t')
for i in xrange(len(chrs)):
if (arr[0]==chrs[i]) and (int(arr[1])>=int(starts[i])) and (int(arr[1])<=int(ends[i])):
outputFile.write(chrs[i]+ '\t' + types[i] + '\t' + starts[i] + '\t' + ends[i] + '\t' + names[i] + '\t' + line + '\n');
我想知道是否可以 运行 并行执行此操作,我可以访问服务器,在那里我可以一次 运行 最多 10 个进程。
问题是,你迭代了 672343 * 795516 = 534'859'613'988 次,很多。您需要更智能的解决方案。
所以我们发现问题是我们查看了太多数据,我们需要改变它。做到这一点的一种方法是尝试变得聪明。也许创建一个字典,其中键对应于 chr
所以我们只需要检查这些条目。但是我们还没有处理 start
和 end
。也许也有一种聪明的方法......
这看起来很像一个数据库。所以如果它是一个数据库,也许我们应该把它当作一个。 Python 包含 sqlite3。
这是一个解决方案,但还有无数其他的可能性。
import sqlite3
import csv
# create an in-memory database
conn = sqlite3.connect(":memory:")
# create the tables
c = conn.cursor()
c.execute("""CREATE TABLE t1 (
chr TEXT,
type TEXT,
name TEXT,
start INTEGER,
end INTEGER
);""")
# if you only have a few columns, just name them all,
# if you have a lot, maybe just put everything in one
# column as a string
c.execute("""CREATE TABLE t2 (
chr TEXT,
num INTEGER,
col3,
col4
);""")
# create indices on the columns we use for selecting
c.execute("""CREATE INDEX i1 ON t1 (chr, start, end);""")
c.execute("""CREATE INDEX i2 ON t2 (chr, num);""")
# fill the tables
with open("comparison_file.csv", 'rb') as f:
reader = csv.reader(f)
# sqlite takes care of converting the number-strings to numbers
c.executemany("INSERT INTO t1 VALUES (?, ?, ?, ?, ?)", reader)
with open("input.csv", 'rb') as f:
reader = csv.reader(f)
# sqlite takes care of converting the number-strings to numbers
c.executemany("INSERT INTO t2 VALUES (?, ?, ?, ?)", reader)
# now let sqlite do its magic and select the correct lines
c.execute("""SELECT t2.*, t1.* FROM t1
JOIN t2 ON t1.chr == t2.chr
WHERE t2.num BETWEEN t1.start AND t1.end;""")
# write result to disk
with open("output.csv", "wb") as f:
writer = csv.writer(f)
for row in c:
writer.writerow(row)
Python 编码技巧
这是我编写您的原始代码的方式。
import csv
# used to be chrs[], type[], name[], start[], end[]
comparisons = []
with open("comparison_file.csv", 'rb') as f:
reader = csv.reader(f)
for chr, type, name, start, end in reader:
comparisons.append([chr, type, name, int(start), int(end)])
with open("output.csv", 'wb') as out_file, \
open("input.csv", 'rb') as in_file:
writer = csv.writer(out_file)
reader = csv.reader(in_file)
for line in reader:
for comp in comparisons:
chr, _, _, end, start = *comp
if line[0] == chr and \
int(line[1]) >= start and \
int(line[2]) >= end:
writer.writerow(comp + line)
备注1
line = line[0:len(line) - 1]
可以写成
line = line[:-1]
备注2
而不是
my_list = [1,2,3]
for i in xrange(len(my_list)):
# do something with my_list[i]
你应该做的:
my_list = [1,2,3]
for item in my_list:
# do something with item
如果需要索引,结合enumerate()
即可。
我能想到一些可能会加快进度的事情,首先是重新排列第一个文件中的数据。
与其将其变成 5 个单独的 list
,不如将其变成 dict
个 list
个 tuple
个 chr
值键:
import csv
import collections
import bisect
# Use a defaultdict so we don't have to worry about whether a chr already exists
foobars = collections.defaultdict(list)
with open('file1.csv', 'rb') as csvfile:
rdr = csv.reader(csvfile)
for (chrs, typ, name, start, end) in rdr:
foobars[chrs].append((int(start), int(end), typ, name))
然后对 foobars
中的每个列表进行排序(您显然应该将其重命名为适合您的任务的名称),这将首先按 start
值排序,因为我们将其放在元组中的第一位:
for lst in foobars.values():
lst.sort()
现在处理您的第二个文件:
for line in inputFile:
line = line.rstrip('\n')
arr = line.split('\t')
arr1int = int(arr[1])
# Since we rearranged our data, we only have to check one of our sublists
search = foobars[arr[0]]
# We use bisect to quickly find the first item where the start value
# is higher than arr[1]
highest = bisect.bisect(search, (arr1int + 1,))
# Now we have a much smaller number of records to check, and we've
# already ensured that chr is a match, and arr[1] >= start
for (start, end, typ, name) in search[:highest]:
if arr1int <= end:
outputFile.write('\t'.join((arr[0], typ, str(start), str(end), name, line)) + '\n')
bisect.bisect()
行值得额外解释一下。如果您有一个排序的值序列,bisect
可用于查找将新值插入到序列中的位置。我们在这里使用它来找到列表中 start
大于我们的 arr[1]
的第一个值(花点时间想想这些概念是如何相关的)。 odd-looking (arr1int + 1,)
值只是确保我们包含所有 start == arr[1]
的条目并将其转换为元组,以便我们比较相似的值。
这几乎肯定会提高代码的性能。到底有多少我真的没有资格说。
没有输入数据我无法真正测试这段代码,所以几乎肯定会有小错误。希望它们很容易修复。