PySpark 马尔可夫模型的算法/编码帮助
Algorithmic / coding help for a PySpark markov model
我需要一些帮助来让我的大脑围绕在 spark 中设计一个(高效的)马尔可夫链(通过 python)。我已经尽我所能地编写了它,但是我想出的代码无法扩展。基本上对于各个地图阶段,我编写了自定义函数并且它们可以很好地处理几千个序列,但是当我们得到在 20,000 多个(我有一些高达 800k)中,事情进展缓慢。
对于那些不熟悉马尔可夫穆德尔的人,这就是它的要点..
这是我的数据。此时我在 RDD 中获得了实际数据(没有 header)。
ID, SEQ
500, HNL, LNH, MLH, HML
我们查看元组中的序列,所以
(HNL, LNH), (LNH,MLH), etc..
我需要达到这一点.. 我 return 一个字典(针对每一行数据),然后将其序列化并存储在内存数据库中。
{500:
{HNLLNH : 0.333},
{LNHMLH : 0.333},
{MLHHML : 0.333},
{LNHHNL : 0.000},
etc..
}
所以本质上,每个序列都与下一个序列组合(HNL,LNH 变为 'HNLLNH'),然后对于所有可能的转换(序列组合),我们计算它们的出现次数,然后除以总数转换(在本例中为 3)并获得它们的出现频率。
上面有 3 个转换,其中一个是 HNLLNH。所以对于 HNLLNH,1/3 = 0.333
另一方面,我不确定它是否相关,但序列中每个位置的值都是有限的。第一个位置 (H/M/L),第二个位置 (M/L), 第三位置 (H,M,L)。
我的代码之前所做的是收集()rdd,并使用我编写的函数将其映射几次。这些函数首先将字符串变成一个列表,然后将 list[1] 与 list[2] 合并,然后将 list[2] 与 list[3] 合并,然后将 list[3] 与 list[4] 合并,等等。所以我结束了想出这样的东西..
[HNLLNH],[LNHMLH],[MHLHML], etc..
然后下一个函数从该列表中创建一个字典,使用列表项作为键,然后计算该键在完整列表中的总出现次数,除以 len(list) 以获得频率。然后我将该字典连同它的 ID 号一起包装在另一个字典中(产生第二个代码块,在上面)。
就像我说的,这对 small-ish 序列很有效,但对长度超过 100k 的列表不太有效。
此外,请记住,这只是一行数据。我必须对 10-20k 行数据的任何地方执行此操作,数据行的长度在每行 500-800,000 个序列之间变化。
关于如何编写 pyspark 代码(使用 API map/reduce/agg/etc.. 函数)以高效执行此操作的任何建议?
编辑
代码如下。从底部开始可能是有意义的。请记住,我正在学习这个(Python 和 Spark),我不是以此为生,所以我的编码标准不是很好..
def f(x):
# Custom RDD map function
# Combines two separate transactions
# into a single transition state
cust_id = x[0]
trans = ','.join(x[1])
y = trans.split(",")
s = ''
for i in range(len(y)-1):
s= s + str(y[i] + str(y[i+1]))+","
return str(cust_id+','+s[:-1])
def g(x):
# Custom RDD map function
# Calculates the transition state probabilities
# by adding up state-transition occurrences
# and dividing by total transitions
cust_id=str(x.split(",")[0])
trans = x.split(",")[1:]
temp_list=[]
middle = int((len(trans[0])+1)/2)
for i in trans:
temp_list.append( (''.join(i)[:middle], ''.join(i)[middle:]) )
state_trans = {}
for i in temp_list:
state_trans[i] = temp_list.count(i)/(len(temp_list))
my_dict = {}
my_dict[cust_id]=state_trans
return my_dict
def gen_tsm_dict_spark(lines):
# Takes RDD/string input with format CUST_ID(or)PROFILE_ID,SEQ,SEQ,SEQ....
# Returns RDD of dict with CUST_ID and tsm per customer
# i.e. {cust_id : { ('NLN', 'LNN') : 0.33, ('HPN', 'NPN') : 0.66}
# creates a tuple ([cust/profile_id], [SEQ,SEQ,SEQ])
cust_trans = lines.map(lambda s: (s.split(",")[0],s.split(",")[1:]))
with_seq = cust_trans.map(f)
full_tsm_dict = with_seq.map(g)
return full_tsm_dict
def main():
result = gen_tsm_spark(my_rdd)
# Insert into DB
for x in result.collect():
for k,v in x.iteritems():
db_insert(k,v)
您可以试试下面的方法。它在很大程度上依赖于 tooolz
,但如果您希望避免外部依赖,您可以轻松地将其替换为一些标准 Python 库。
from __future__ import division
from collections import Counter
from itertools import product
from toolz.curried import sliding_window, map, pipe, concat
from toolz.dicttoolz import merge
# Generate all possible transitions
defaults = sc.broadcast(dict(map(
lambda x: ("".join(concat(x)), 0.0),
product(product("HNL", "NL", "HNL"), repeat=2))))
rdd = sc.parallelize(["500, HNL, LNH, NLH, HNL", "600, HNN, NNN, NNN, HNN, LNH"])
def process(line):
"""
>>> process("000, HHH, LLL, NNN")
('000', {'LLLNNN': 0.5, 'HHHLLL': 0.5})
"""
bits = line.split(", ")
transactions = bits[1:]
n = len(transactions) - 1
frequencies = pipe(
sliding_window(2, transactions), # Get all transitions
map(lambda p: "".join(p)), # Joins strings
Counter, # Count
lambda cnt: {k: v / n for (k, v) in cnt.items()} # Get frequencies
)
return bits[0], frequencies
def store_partition(iter):
for (k, v) in iter:
db_insert(k, merge([defaults.value, v]))
rdd.map(process).foreachPartition(store_partition)
既然你知道所有可能的转换,我建议使用稀疏表示并忽略零。此外,您可以用稀疏向量替换字典以减少内存占用。
你可以通过使用纯 Pyspark 来实现这个结果,我确实使用了 pyspark。
要创建频率,假设您已经实现了这些是输入 RDD
ID, SEQ
500, [HNL, LNH, MLH, HML ...]
并获得像 (HNL, LNH),(LNH, MLH)....
这样的频率
inputRDD..map(lambda (k, list): get_frequencies(list)).flatMap(lambda x: x) \
.reduceByKey(lambda v1,v2: v1 +v2)
get_frequencies(states_list):
"""
:param states_list: Its a list of Customer States.
:return: State Frequencies List.
"""
rest = []
tuples_list = []
for idx in range(0,len(states_list)):
if idx + 1 < len(states_list):
tuples_list.append((states_list[idx],states_list[idx+1]))
unique = set(tuples_list)
for value in unique:
rest.append((value, tuples_list.count(value)))
return rest
你会得到结果
((HNL, LNH), 98),((LNH, MLH), 458),() ......
在此之后您可以将结果 RDDs
转换为 Dataframes
或者您可以使用 RDDs mapPartitions
直接插入 DB
我需要一些帮助来让我的大脑围绕在 spark 中设计一个(高效的)马尔可夫链(通过 python)。我已经尽我所能地编写了它,但是我想出的代码无法扩展。基本上对于各个地图阶段,我编写了自定义函数并且它们可以很好地处理几千个序列,但是当我们得到在 20,000 多个(我有一些高达 800k)中,事情进展缓慢。
对于那些不熟悉马尔可夫穆德尔的人,这就是它的要点..
这是我的数据。此时我在 RDD 中获得了实际数据(没有 header)。
ID, SEQ
500, HNL, LNH, MLH, HML
我们查看元组中的序列,所以
(HNL, LNH), (LNH,MLH), etc..
我需要达到这一点.. 我 return 一个字典(针对每一行数据),然后将其序列化并存储在内存数据库中。
{500:
{HNLLNH : 0.333},
{LNHMLH : 0.333},
{MLHHML : 0.333},
{LNHHNL : 0.000},
etc..
}
所以本质上,每个序列都与下一个序列组合(HNL,LNH 变为 'HNLLNH'),然后对于所有可能的转换(序列组合),我们计算它们的出现次数,然后除以总数转换(在本例中为 3)并获得它们的出现频率。
上面有 3 个转换,其中一个是 HNLLNH。所以对于 HNLLNH,1/3 = 0.333
另一方面,我不确定它是否相关,但序列中每个位置的值都是有限的。第一个位置 (H/M/L),第二个位置 (M/L), 第三位置 (H,M,L)。
我的代码之前所做的是收集()rdd,并使用我编写的函数将其映射几次。这些函数首先将字符串变成一个列表,然后将 list[1] 与 list[2] 合并,然后将 list[2] 与 list[3] 合并,然后将 list[3] 与 list[4] 合并,等等。所以我结束了想出这样的东西..
[HNLLNH],[LNHMLH],[MHLHML], etc..
然后下一个函数从该列表中创建一个字典,使用列表项作为键,然后计算该键在完整列表中的总出现次数,除以 len(list) 以获得频率。然后我将该字典连同它的 ID 号一起包装在另一个字典中(产生第二个代码块,在上面)。
就像我说的,这对 small-ish 序列很有效,但对长度超过 100k 的列表不太有效。
此外,请记住,这只是一行数据。我必须对 10-20k 行数据的任何地方执行此操作,数据行的长度在每行 500-800,000 个序列之间变化。
关于如何编写 pyspark 代码(使用 API map/reduce/agg/etc.. 函数)以高效执行此操作的任何建议?
编辑 代码如下。从底部开始可能是有意义的。请记住,我正在学习这个(Python 和 Spark),我不是以此为生,所以我的编码标准不是很好..
def f(x):
# Custom RDD map function
# Combines two separate transactions
# into a single transition state
cust_id = x[0]
trans = ','.join(x[1])
y = trans.split(",")
s = ''
for i in range(len(y)-1):
s= s + str(y[i] + str(y[i+1]))+","
return str(cust_id+','+s[:-1])
def g(x):
# Custom RDD map function
# Calculates the transition state probabilities
# by adding up state-transition occurrences
# and dividing by total transitions
cust_id=str(x.split(",")[0])
trans = x.split(",")[1:]
temp_list=[]
middle = int((len(trans[0])+1)/2)
for i in trans:
temp_list.append( (''.join(i)[:middle], ''.join(i)[middle:]) )
state_trans = {}
for i in temp_list:
state_trans[i] = temp_list.count(i)/(len(temp_list))
my_dict = {}
my_dict[cust_id]=state_trans
return my_dict
def gen_tsm_dict_spark(lines):
# Takes RDD/string input with format CUST_ID(or)PROFILE_ID,SEQ,SEQ,SEQ....
# Returns RDD of dict with CUST_ID and tsm per customer
# i.e. {cust_id : { ('NLN', 'LNN') : 0.33, ('HPN', 'NPN') : 0.66}
# creates a tuple ([cust/profile_id], [SEQ,SEQ,SEQ])
cust_trans = lines.map(lambda s: (s.split(",")[0],s.split(",")[1:]))
with_seq = cust_trans.map(f)
full_tsm_dict = with_seq.map(g)
return full_tsm_dict
def main():
result = gen_tsm_spark(my_rdd)
# Insert into DB
for x in result.collect():
for k,v in x.iteritems():
db_insert(k,v)
您可以试试下面的方法。它在很大程度上依赖于 tooolz
,但如果您希望避免外部依赖,您可以轻松地将其替换为一些标准 Python 库。
from __future__ import division
from collections import Counter
from itertools import product
from toolz.curried import sliding_window, map, pipe, concat
from toolz.dicttoolz import merge
# Generate all possible transitions
defaults = sc.broadcast(dict(map(
lambda x: ("".join(concat(x)), 0.0),
product(product("HNL", "NL", "HNL"), repeat=2))))
rdd = sc.parallelize(["500, HNL, LNH, NLH, HNL", "600, HNN, NNN, NNN, HNN, LNH"])
def process(line):
"""
>>> process("000, HHH, LLL, NNN")
('000', {'LLLNNN': 0.5, 'HHHLLL': 0.5})
"""
bits = line.split(", ")
transactions = bits[1:]
n = len(transactions) - 1
frequencies = pipe(
sliding_window(2, transactions), # Get all transitions
map(lambda p: "".join(p)), # Joins strings
Counter, # Count
lambda cnt: {k: v / n for (k, v) in cnt.items()} # Get frequencies
)
return bits[0], frequencies
def store_partition(iter):
for (k, v) in iter:
db_insert(k, merge([defaults.value, v]))
rdd.map(process).foreachPartition(store_partition)
既然你知道所有可能的转换,我建议使用稀疏表示并忽略零。此外,您可以用稀疏向量替换字典以减少内存占用。
你可以通过使用纯 Pyspark 来实现这个结果,我确实使用了 pyspark。
要创建频率,假设您已经实现了这些是输入 RDD
ID, SEQ
500, [HNL, LNH, MLH, HML ...]
并获得像 (HNL, LNH),(LNH, MLH)....
inputRDD..map(lambda (k, list): get_frequencies(list)).flatMap(lambda x: x) \
.reduceByKey(lambda v1,v2: v1 +v2)
get_frequencies(states_list):
"""
:param states_list: Its a list of Customer States.
:return: State Frequencies List.
"""
rest = []
tuples_list = []
for idx in range(0,len(states_list)):
if idx + 1 < len(states_list):
tuples_list.append((states_list[idx],states_list[idx+1]))
unique = set(tuples_list)
for value in unique:
rest.append((value, tuples_list.count(value)))
return rest
你会得到结果
((HNL, LNH), 98),((LNH, MLH), 458),() ......
在此之后您可以将结果 RDDs
转换为 Dataframes
或者您可以使用 RDDs mapPartitions
DB