python 中计算词频和排序列表的高效快速方法
Efficient and fast way to counting word frequency and sorting the list in python
我已经分析了文本数据,现在我想从分析结果中统计出满足特定条件(日期、类别等)的关键字。分析结果各5万多,我有1500个条件。有没有efficient/fast提取符合条件的关键字的方法?
下面是我写的代码,非常耗时,所以我需要一些有效的方法。
from collections import defaultdict
from typing import DefaultDict
# function for counting keywords
def count_words(top_rel: DefaultDict, top_pos: DefaultDict, top_neg: DefaultDict, data: pd.DataFrame):
if isinstance(data.loc[:, "3"].values[0], str):
for i, item in data.loc[:, "0":"3"].iterrows():
for pos_word in ast.literal_eval(item["1"]):
top_pos[pos_word] += 1
for neg_word in ast.literal_eval(item["2"]):
top_neg[neg_word] += 1
for rel_word in ast.literal_eval(item["3"]):
top_rel[rel_word] += 1
else:
for i, item in data.loc[:, "0":"3"].iterrows():
for pos_word in item["1"]:
top_pos[pos_word] += 1
for neg_word in item["2"]:
top_neg[neg_word] += 1
for rel_word in item["3"]:
top_rel[rel_word] += 1
return top_rel, top_pos, top_neg
# Create conditions
cat_ids = [subcats['id'] for subcats in cp.cat_config['cat'].values()] # cat ids in the category table
index = pd.MultiIndex.from_product([cat_ids, data.code.unique(), [start_date.strftime("%Y%m%d")],
data.target.unique(), [datetime.datetime.strptime(str(data._dates.unique()[0]),
"%Y%m%d").date().isocalendar()[1]]], names=["category_code", "region_code", "start_date",
"target", "year_week"]) # Cartesian product
top_word_id = pd.DataFrame(index=index).reset_index()
# Create defaultdict for each condition
top_word_id.loc[:, 'weekly_associated_top_word'] = [defaultdict(int) for _ in range(top_word_id.shape[0])]
top_word_id.loc[:, 'weekly_positive_top_word'] = [defaultdict(int) for _ in range(top_word_id.shape[0])]
top_word_id.loc[:, 'weekly_negative_top_word'] = [defaultdict(int) for _ in range(top_word_id.shape[0])]
# for specific periods,
while dates_queue:
date = dates_queue.popleft()
date_str = date.strftime("%Y%m%d.tsv")
data = pd.read_csv(PurePath("../results", date_str), sep='\t', engine='python', encoding='utf-8')
for i, item in top_word_id.iterrows(): # for each condition
# find data matched to the condition
id = item.loc["category_code"]
target = item.loc['target']
code = item.loc['region_code']
category_data = data[data.loc[:, id] == 1]
if category_data.shape[0] == 0:
continue
temp = category_data[(category_data.loc[:, 'target'] == target) & (category_data.loc[:, 'code'] == code)]
if temp.shape[0] == 0:
continue
top_pos, top_neg, top_rel = count_words(top_word_id.iloc[i, 6], top_word_id.iloc[i, 7], top_word_id.iloc[i, 8], data)
top_word_id.at[i, "weekly_associated_top_word"] = rel
top_word_id.at[i, "weekly_positive_top_word"] = pos
top_word_id.at[i, "weekly_negative_top_word"] = neg
编辑
我真的很想给你看一个样本,但它太大了,而且是韩语,我想你看不懂。相反,我说明了逻辑的伪代码。
- 输入
- data(
pd.DataFrame
):输入是一天的文档集合。它具有名为 target
、category
和 code
的列。此外,数据包含名为 0, 1, 2, 3
的列。每个元素都是一个单词列表。 (例如 data.loc[0, "0"] = ['a', 'b', 'c']
、data.loc[0, "1"] = ['hello', 'world', '.']
)
- top_word_id(
pd.DataFrame
):DataFrame
的每一行代表一个条件。
- 算法:我想要的是找到
data
行满足某些特定条件(即 target
、category
和 code
)的行。正如我之前提到的,条件是 tow_word_id
的每一行。
- 输出:假设我想找到满足条件的数据,条件是
top_word_id
的第 j 行。满足条件的数据个数为2,即数据的i1和i2。所以我想聚合数据的i1和i2的词频。聚合的结果一定要保留,因为我要聚合今天和明天文档的词频。
在简单的用例中,给定一个可迭代对象,您可以使用 collections.Counter
对象,https://docs.python.org/3/library/collections.html#collections.Counter 例如
>>> from collections import Counter
>>> mylist = [1,2,3,3,2,1]
>>> Counter(mylist)
Counter({1: 2, 2: 2, 3: 2})
给定一个字符串:
>>> text = "This is a sentence with repeated words words words in the sentence"
>>> tokenized_text = text.split()
>>> Counter(tokenized_text)
Counter({'This': 1,
'is': 1,
'a': 1,
'sentence': 2,
'with': 1,
'repeated': 1,
'words': 3,
'in': 1,
'the': 1})
要更新计数器:
>> counter = Counter()
>>> counter.update(tokenized_text_1) # assuming tokenized text is an iterable of strings.
>>> counter.update(tokenized_text_2)
我利用 collections.Counter
、Cython 和 multiprocessing.Pool
实现了高效快速的逻辑。我用 Counter
替换了计数部分,并利用 Cython 和 multiprocessing.Pool
提高效率。
完整代码如下:
from collections import defaultdict, Counter
from typing import DefaultDict
def count_words(top_pos: DefaultDict, top_neg: DefaultDict, top_rel: DefaultDict, data: pd.DataFrame):
print(data)
if isinstance(data.loc[:, "3"].values[0], str):
data_pos = data.loc[:, "1"].apply(lambda x: Counter(ast.literal_eval(x)))
data_neg = data.loc[:, "2"].apply(lambda x: Counter(ast.literal_eval(x)))
data_rel = data.loc[:, "3"].apply(lambda x: Counter(ast.literal_eval(x)))
print(data_pos)
print(data_neg)
print(data_rel)
for item in data_pos:
for k, v in item.items():
top_pos[k] += v
for item in data_neg:
for k, v in item.items():
top_neg[k] += v
for item in data_rel:
for k, v in item.items():
top_rel[k] += v
elif isinstance(data.loc[:, "3"].values[0], list):
print(data_pos)
print(data_neg)
print(data_rel)
data_pos = data.loc[:, "1"].apply(lambda x: Counter(x))
data_neg = data.loc[:, "2"].apply(lambda x: Counter(x))
data_rel = data.loc[:, "3"].apply(lambda x: Counter(x))
for item in data_pos:
for k, v in item.items():
top_pos[k] += v
for item in data_neg.items():
for k, v in item.items():
top_neg[k] += v
for item in data_rel.items():
for k, v in item.items():
top_rel[k] += v
else:
raise ValueError("The type must be either list or str")
return top_pos, top_neg, top_rel
def test(data, top_word_id):
for i, item in top_word_id.iterrows():
id = item.loc["category_code"]
target = item.loc['target']
code = item.loc['region_code']
category_data = data[data.loc[:, id] == 1]
if category_data.shape[0] == 0:
continue
temp = category_data[(category_data.loc[:, 'target'] == target) & (category_data.loc[:, 'code'] == code)]
if temp.shape[0] == 0:
continue
top_pos, top_neg, top_rel = count_words(top_word_id.loc[i, "weekly_positive_top_word"], top_word_id.loc[i, "weekly_negative_top_word"], top_word_id.loc[i, "weekly_associated_top_word"], data)
top_word_id.at[i, "weekly_associated_top_word"] = top_rel
top_word_id.at[i, "weekly_positive_top_word"] = top_pos
top_word_id.at[i, "weekly_negative_top_word"] = top_neg
return top_word_id
from multiprocessing import Pool, cpu_count
from contextlib import contextmanager
import numpy as np
@contextmanager
def poolcontext(*args, **kwargs):
try:
pool = Pool(*args, **kwargs)
yield pool
finally:
pool.terminate()
def parallelize_aggregation(data, top_word_id, func):
num_cores = cpu_count()
df_split = np.array_split(top_word_id, num_cores, axis=0)
with poolcontext(processes=num_cores) as pool:
results = pool.starmap(test, zip([data for _ in range(num_cores)], df_split))
return results
parallelize_aggregation(data, top_word_id, aggregate.test)
下面table表示代码的次数:
Code
Times
Cython (the code in the question)
4749s
Cython + Counter
3066s
Cython + Counter
+ multiprocessing.Pool
10s
我已经分析了文本数据,现在我想从分析结果中统计出满足特定条件(日期、类别等)的关键字。分析结果各5万多,我有1500个条件。有没有efficient/fast提取符合条件的关键字的方法?
下面是我写的代码,非常耗时,所以我需要一些有效的方法。
from collections import defaultdict
from typing import DefaultDict
# function for counting keywords
def count_words(top_rel: DefaultDict, top_pos: DefaultDict, top_neg: DefaultDict, data: pd.DataFrame):
if isinstance(data.loc[:, "3"].values[0], str):
for i, item in data.loc[:, "0":"3"].iterrows():
for pos_word in ast.literal_eval(item["1"]):
top_pos[pos_word] += 1
for neg_word in ast.literal_eval(item["2"]):
top_neg[neg_word] += 1
for rel_word in ast.literal_eval(item["3"]):
top_rel[rel_word] += 1
else:
for i, item in data.loc[:, "0":"3"].iterrows():
for pos_word in item["1"]:
top_pos[pos_word] += 1
for neg_word in item["2"]:
top_neg[neg_word] += 1
for rel_word in item["3"]:
top_rel[rel_word] += 1
return top_rel, top_pos, top_neg
# Create conditions
cat_ids = [subcats['id'] for subcats in cp.cat_config['cat'].values()] # cat ids in the category table
index = pd.MultiIndex.from_product([cat_ids, data.code.unique(), [start_date.strftime("%Y%m%d")],
data.target.unique(), [datetime.datetime.strptime(str(data._dates.unique()[0]),
"%Y%m%d").date().isocalendar()[1]]], names=["category_code", "region_code", "start_date",
"target", "year_week"]) # Cartesian product
top_word_id = pd.DataFrame(index=index).reset_index()
# Create defaultdict for each condition
top_word_id.loc[:, 'weekly_associated_top_word'] = [defaultdict(int) for _ in range(top_word_id.shape[0])]
top_word_id.loc[:, 'weekly_positive_top_word'] = [defaultdict(int) for _ in range(top_word_id.shape[0])]
top_word_id.loc[:, 'weekly_negative_top_word'] = [defaultdict(int) for _ in range(top_word_id.shape[0])]
# for specific periods,
while dates_queue:
date = dates_queue.popleft()
date_str = date.strftime("%Y%m%d.tsv")
data = pd.read_csv(PurePath("../results", date_str), sep='\t', engine='python', encoding='utf-8')
for i, item in top_word_id.iterrows(): # for each condition
# find data matched to the condition
id = item.loc["category_code"]
target = item.loc['target']
code = item.loc['region_code']
category_data = data[data.loc[:, id] == 1]
if category_data.shape[0] == 0:
continue
temp = category_data[(category_data.loc[:, 'target'] == target) & (category_data.loc[:, 'code'] == code)]
if temp.shape[0] == 0:
continue
top_pos, top_neg, top_rel = count_words(top_word_id.iloc[i, 6], top_word_id.iloc[i, 7], top_word_id.iloc[i, 8], data)
top_word_id.at[i, "weekly_associated_top_word"] = rel
top_word_id.at[i, "weekly_positive_top_word"] = pos
top_word_id.at[i, "weekly_negative_top_word"] = neg
编辑
我真的很想给你看一个样本,但它太大了,而且是韩语,我想你看不懂。相反,我说明了逻辑的伪代码。
- 输入
- data(
pd.DataFrame
):输入是一天的文档集合。它具有名为target
、category
和code
的列。此外,数据包含名为0, 1, 2, 3
的列。每个元素都是一个单词列表。 (例如data.loc[0, "0"] = ['a', 'b', 'c']
、data.loc[0, "1"] = ['hello', 'world', '.']
) - top_word_id(
pd.DataFrame
):DataFrame
的每一行代表一个条件。
- data(
- 算法:我想要的是找到
data
行满足某些特定条件(即target
、category
和code
)的行。正如我之前提到的,条件是tow_word_id
的每一行。 - 输出:假设我想找到满足条件的数据,条件是
top_word_id
的第 j 行。满足条件的数据个数为2,即数据的i1和i2。所以我想聚合数据的i1和i2的词频。聚合的结果一定要保留,因为我要聚合今天和明天文档的词频。
在简单的用例中,给定一个可迭代对象,您可以使用 collections.Counter
对象,https://docs.python.org/3/library/collections.html#collections.Counter 例如
>>> from collections import Counter
>>> mylist = [1,2,3,3,2,1]
>>> Counter(mylist)
Counter({1: 2, 2: 2, 3: 2})
给定一个字符串:
>>> text = "This is a sentence with repeated words words words in the sentence"
>>> tokenized_text = text.split()
>>> Counter(tokenized_text)
Counter({'This': 1,
'is': 1,
'a': 1,
'sentence': 2,
'with': 1,
'repeated': 1,
'words': 3,
'in': 1,
'the': 1})
要更新计数器:
>> counter = Counter()
>>> counter.update(tokenized_text_1) # assuming tokenized text is an iterable of strings.
>>> counter.update(tokenized_text_2)
我利用 collections.Counter
、Cython 和 multiprocessing.Pool
实现了高效快速的逻辑。我用 Counter
替换了计数部分,并利用 Cython 和 multiprocessing.Pool
提高效率。
完整代码如下:
from collections import defaultdict, Counter
from typing import DefaultDict
def count_words(top_pos: DefaultDict, top_neg: DefaultDict, top_rel: DefaultDict, data: pd.DataFrame):
print(data)
if isinstance(data.loc[:, "3"].values[0], str):
data_pos = data.loc[:, "1"].apply(lambda x: Counter(ast.literal_eval(x)))
data_neg = data.loc[:, "2"].apply(lambda x: Counter(ast.literal_eval(x)))
data_rel = data.loc[:, "3"].apply(lambda x: Counter(ast.literal_eval(x)))
print(data_pos)
print(data_neg)
print(data_rel)
for item in data_pos:
for k, v in item.items():
top_pos[k] += v
for item in data_neg:
for k, v in item.items():
top_neg[k] += v
for item in data_rel:
for k, v in item.items():
top_rel[k] += v
elif isinstance(data.loc[:, "3"].values[0], list):
print(data_pos)
print(data_neg)
print(data_rel)
data_pos = data.loc[:, "1"].apply(lambda x: Counter(x))
data_neg = data.loc[:, "2"].apply(lambda x: Counter(x))
data_rel = data.loc[:, "3"].apply(lambda x: Counter(x))
for item in data_pos:
for k, v in item.items():
top_pos[k] += v
for item in data_neg.items():
for k, v in item.items():
top_neg[k] += v
for item in data_rel.items():
for k, v in item.items():
top_rel[k] += v
else:
raise ValueError("The type must be either list or str")
return top_pos, top_neg, top_rel
def test(data, top_word_id):
for i, item in top_word_id.iterrows():
id = item.loc["category_code"]
target = item.loc['target']
code = item.loc['region_code']
category_data = data[data.loc[:, id] == 1]
if category_data.shape[0] == 0:
continue
temp = category_data[(category_data.loc[:, 'target'] == target) & (category_data.loc[:, 'code'] == code)]
if temp.shape[0] == 0:
continue
top_pos, top_neg, top_rel = count_words(top_word_id.loc[i, "weekly_positive_top_word"], top_word_id.loc[i, "weekly_negative_top_word"], top_word_id.loc[i, "weekly_associated_top_word"], data)
top_word_id.at[i, "weekly_associated_top_word"] = top_rel
top_word_id.at[i, "weekly_positive_top_word"] = top_pos
top_word_id.at[i, "weekly_negative_top_word"] = top_neg
return top_word_id
from multiprocessing import Pool, cpu_count
from contextlib import contextmanager
import numpy as np
@contextmanager
def poolcontext(*args, **kwargs):
try:
pool = Pool(*args, **kwargs)
yield pool
finally:
pool.terminate()
def parallelize_aggregation(data, top_word_id, func):
num_cores = cpu_count()
df_split = np.array_split(top_word_id, num_cores, axis=0)
with poolcontext(processes=num_cores) as pool:
results = pool.starmap(test, zip([data for _ in range(num_cores)], df_split))
return results
parallelize_aggregation(data, top_word_id, aggregate.test)
下面table表示代码的次数:
Code | Times |
---|---|
Cython (the code in the question) | 4749s |
Cython + Counter |
3066s |
Cython + Counter + multiprocessing.Pool |
10s |