如何使用 BZ2 JSON twitter 文件有效地将大 (30GB+) TAR 文件读入 PostgreSQL
How to effectively read large (30GB+) TAR file with BZ2 JSON twitter files into PostgreSQL
我正在尝试从 archive.org archive 获取 Twitter 数据并将其加载到数据库中。我试图首先加载特定月份的 all 推文,然后选择推文并仅展示我感兴趣的推文(例如按区域设置或主题标签)。
我能够运行下面描述的脚本来完成我正在寻找的事情,但我有一个问题,它非常慢。它运行了大约半小时,并且在一个 TAR 文件中只读取了 ~ 6 / 50,000 个内部 .bz2 文件。
示例 TAR 文件的一些统计信息:
- 总大小:~ 30-40GB
- 内部 .bz2 文件数量(按文件夹排列):50,000
- 一个 .bz2 文件的大小:~600kb
- 一个提取的 JSON 文件的大小:~5 MB,~3600 条推文。
优化此过程以提高速度时我应该注意什么?
- 我应该将文件提取到磁盘而不是在 Python 中缓冲它们吗?
- 我应该看看多线程进程的一部分吗?流程的哪一部分最适合这个?
- 或者,对于这样的脚本,我目前获取的速度是不是比较正常?
该脚本当前使用了我的 CPU 的 ~ 3% 和我的 RAM 内存的 ~ 6%。
非常感谢任何帮助。
import tarfile
import dataset # Using dataset as I'm still iteratively developing the table structure(s)
import json
import datetime
def scrape_tar_contents(filename):
"""Iterates over an input TAR filename, retrieving each .bz2 container:
extracts & retrieves JSON contents; stores JSON contents in a postgreSQL database"""
tar = tarfile.open(filename, 'r')
inner_files = [filename for filename in tar.getnames() if filename.endswith('.bz2')]
num_bz2_files = len(inner_files)
bz2_count = 1
print('Starting work on file... ' + filename[-20:])
for bz2_filename in inner_files: # Loop over all files in the TAR archive
print('Starting work on inner file... ' + bz2_filename[-20:] + ': ' + str(bz2_count) + '/' + str(num_bz2_files))
t_extract = tar.extractfile(bz2_filename)
data = t_extract.read()
txt = bz2.decompress(data)
tweet_errors = 0
current_line = 1
num_lines = len(txt.split('\n'))
for line in txt.split('\n'): # Loop over the lines in the resulting text file.
if current_line % 100 == 0:
print('Working on line ' + str(current_line) + '/' + str(num_lines))
try:
tweet = json.loads(line)
except ValueError, e:
error_log = {'Date_time': datetime.datetime.now(),
'File_TAR': filename,
'File_BZ2': bz2_filename,
'Line_number': current_line,
'Line': line,
'Error': str(e)}
tweet_errors += 1
db['error_log'].upsert(error_log, ['File_TAR', 'File_BZ2', 'Line_number'])
print('Error occured, now at ' + str(tweet_errors))
try:
tweet_id = tweet['id']
tweet_text = tweet['text']
tweet_locale = tweet['lang']
created_at = tweet['created_at']
tweet_json = tweet
data = {'tweet_id': tweet_id,
'tweet_text': tweet_text,
'tweet_locale': tweet_locale,
'created_at_str': created_at,
'date_loaded': datetime.datetime.now(),
'tweet_json': tweet_json}
db['tweets'].upsert(data, ['tweet_id'])
except KeyError, e:
error_log = {'Date_time': datetime.datetime.now(),
'File_TAR': filename,
'File_BZ2': bz2_filename,
'Line_number': current_line,
'Line': line,
'Error': str(e)}
tweet_errors += 1
db['error_log'].upsert(error_log, ['File_TAR', 'File_BZ2', 'Line_number'])
print('Error occured, now at ' + str(tweet_errors))
continue
if __name__ == "__main__":
with open("postgresConnecString.txt", 'r') as f:
db_connectionstring = f.readline()
db = dataset.connect(db_connectionstring)
filename = r'H:/Twitter datastream/Sourcefiles/archiveteam-twitter-stream-2013-01.tar'
scrape_tar_contents(filename)
tar 文件不包含文件所在位置的索引。此外,tar 文件可以包含 more than one copy of the same file。因此,当您提取一个文件时,整个tar文件必须被读取。即使找到文件后,仍必须读取 tar 文件的其余部分以检查是否存在后续副本。
这使得提取一个文件与提取所有文件一样昂贵。
因此,切勿在大型 tar 文件上使用 tar.extractfile(...)
(除非您只需要一个文件或没有 space 来提取所有内容)。
如果你有 space(考虑到现代硬盘驱动器的大小,你几乎肯定有),使用 tar.extractall
或系统调用 tar xf ...
提取所有内容,然后处理提取的文件。
我正在尝试从 archive.org archive 获取 Twitter 数据并将其加载到数据库中。我试图首先加载特定月份的 all 推文,然后选择推文并仅展示我感兴趣的推文(例如按区域设置或主题标签)。
我能够运行下面描述的脚本来完成我正在寻找的事情,但我有一个问题,它非常慢。它运行了大约半小时,并且在一个 TAR 文件中只读取了 ~ 6 / 50,000 个内部 .bz2 文件。
示例 TAR 文件的一些统计信息:
- 总大小:~ 30-40GB
- 内部 .bz2 文件数量(按文件夹排列):50,000
- 一个 .bz2 文件的大小:~600kb
- 一个提取的 JSON 文件的大小:~5 MB,~3600 条推文。
优化此过程以提高速度时我应该注意什么?
- 我应该将文件提取到磁盘而不是在 Python 中缓冲它们吗?
- 我应该看看多线程进程的一部分吗?流程的哪一部分最适合这个?
- 或者,对于这样的脚本,我目前获取的速度是不是比较正常?
该脚本当前使用了我的 CPU 的 ~ 3% 和我的 RAM 内存的 ~ 6%。
非常感谢任何帮助。
import tarfile
import dataset # Using dataset as I'm still iteratively developing the table structure(s)
import json
import datetime
def scrape_tar_contents(filename):
"""Iterates over an input TAR filename, retrieving each .bz2 container:
extracts & retrieves JSON contents; stores JSON contents in a postgreSQL database"""
tar = tarfile.open(filename, 'r')
inner_files = [filename for filename in tar.getnames() if filename.endswith('.bz2')]
num_bz2_files = len(inner_files)
bz2_count = 1
print('Starting work on file... ' + filename[-20:])
for bz2_filename in inner_files: # Loop over all files in the TAR archive
print('Starting work on inner file... ' + bz2_filename[-20:] + ': ' + str(bz2_count) + '/' + str(num_bz2_files))
t_extract = tar.extractfile(bz2_filename)
data = t_extract.read()
txt = bz2.decompress(data)
tweet_errors = 0
current_line = 1
num_lines = len(txt.split('\n'))
for line in txt.split('\n'): # Loop over the lines in the resulting text file.
if current_line % 100 == 0:
print('Working on line ' + str(current_line) + '/' + str(num_lines))
try:
tweet = json.loads(line)
except ValueError, e:
error_log = {'Date_time': datetime.datetime.now(),
'File_TAR': filename,
'File_BZ2': bz2_filename,
'Line_number': current_line,
'Line': line,
'Error': str(e)}
tweet_errors += 1
db['error_log'].upsert(error_log, ['File_TAR', 'File_BZ2', 'Line_number'])
print('Error occured, now at ' + str(tweet_errors))
try:
tweet_id = tweet['id']
tweet_text = tweet['text']
tweet_locale = tweet['lang']
created_at = tweet['created_at']
tweet_json = tweet
data = {'tweet_id': tweet_id,
'tweet_text': tweet_text,
'tweet_locale': tweet_locale,
'created_at_str': created_at,
'date_loaded': datetime.datetime.now(),
'tweet_json': tweet_json}
db['tweets'].upsert(data, ['tweet_id'])
except KeyError, e:
error_log = {'Date_time': datetime.datetime.now(),
'File_TAR': filename,
'File_BZ2': bz2_filename,
'Line_number': current_line,
'Line': line,
'Error': str(e)}
tweet_errors += 1
db['error_log'].upsert(error_log, ['File_TAR', 'File_BZ2', 'Line_number'])
print('Error occured, now at ' + str(tweet_errors))
continue
if __name__ == "__main__":
with open("postgresConnecString.txt", 'r') as f:
db_connectionstring = f.readline()
db = dataset.connect(db_connectionstring)
filename = r'H:/Twitter datastream/Sourcefiles/archiveteam-twitter-stream-2013-01.tar'
scrape_tar_contents(filename)
tar 文件不包含文件所在位置的索引。此外,tar 文件可以包含 more than one copy of the same file。因此,当您提取一个文件时,整个tar文件必须被读取。即使找到文件后,仍必须读取 tar 文件的其余部分以检查是否存在后续副本。
这使得提取一个文件与提取所有文件一样昂贵。
因此,切勿在大型 tar 文件上使用 tar.extractfile(...)
(除非您只需要一个文件或没有 space 来提取所有内容)。
如果你有 space(考虑到现代硬盘驱动器的大小,你几乎肯定有),使用 tar.extractall
或系统调用 tar xf ...
提取所有内容,然后处理提取的文件。