聚合和组合 RDD 的正确方法
The right way to aggregate and combine RDDs
我有一个客户 table,它为每个客户托管有关多个流程的信息。
目标是为每个客户和每个流程提取特征。这意味着每个特征主要是对 .groupby(customerID, processID)
对象的聚合或排序比较计算。
但是,我们的目标是随着时间的推移能够添加越来越多的功能。所以基本上用户应该能够定义一个带有一些过滤器、指标和聚合的新函数,并将这个新函数添加到一个函数池中,该函数池在 table.
上运行
输出应该是一个 customerID,processID table,具有所有特征。
所以我开始一个最小的工作示例:
l = [('CM1','aa1', 100,0.1),('CM1','aa1', 110,0.2),\
('CM1','aa1', 110,0.9),('CM1','aa1', 100,1.5),\
('CX2','bb9', 100,0.1),('CX2','bb9', 100,0.2),\
('CX2','bb9', 110,6.0),('CX2','bb9', 100,0.18)]
rdd = sc.parallelize(l)
df = sqlContext.createDataFrame(rdd,['customid','procid','speed','timestamp'])
+--------+------+-----+---------+
|customid|procid|speed|timestamp|
+--------+------+-----+---------+
| CM1| aa1| 100| 0.1|
| CM1| aa1| 110| 0.2|
| CM1| aa1| 110| 0.9|
| CM1| aa1| 100| 1.5|
| CX2| bb9| 100| 0.1|
| CX2| bb9| 100| 0.2|
| CX2| bb9| 110| 6.0|
| CX2| bb9| 100| 0.18|
+--------+------+-----+---------+
然后我定义了 2 个任意特征,它们被这些函数提取:
def extr_ft_1 (proc_data, limit=100):
proc_data = proc_data.filter(proc_data.speed > limit).agg(count(proc_data.speed))
proc_data = proc_data.select(col('count(speed)').alias('speed_feature'))
proc_data.show()
return proc_data
def extr_ft_0 (proc_data):
max_t = proc_data.agg(spark_max(proc_data.timestamp))
min_t = proc_data.agg(spark_min(proc_data.timestamp))
max_t = max_t.select(col('max(timestamp)').alias('max'))
min_t = min_t.select(col('min(timestamp)').alias('min'))
X = max_t.crossJoin(min_t)
X = X.withColumn('time_feature', X.max+X.min)
X = X.drop(X.min).drop(X.max)
X.show()
return (X)
它们 return 只包含一个聚合值的 1 元素 RRD。
接下来,将所有特征函数应用于给定进程,并在每个进程的结果 RDD 中组合:
def get_proc_features(proc, data, *features):
proc_data = data.filter( data.customid == proc)
features_for_proc = [feature_value(proc_data) for feature_value in features]
for number, feature in enumerate(features_for_proc):
if number == 0:
l = [(proc,'dummy')]
rdd = sc.parallelize(l)
df = sqlContext.createDataFrame(rdd,['customid','dummy'])
df = df.drop(df.dummy)
df.show()
features_for_proc_rdd = feature
features_for_proc_rdd = features_for_proc_rdd.crossJoin(df)
continue
features_for_proc_rdd = features_for_proc_rdd.crossJoin(feature)
features_for_proc_rdd.show()
return features_for_proc_rdd
他们的最后一步是将包含每个进程的特征的所有行附加到一个数据帧:
for number, proc in enumerate(customer_list_1):
if number == 0:
#results = get_trip_features(trip, df, extr_ft_0, extr_ft_1)
results = get_proc_features(proc, df, *extr_feature_funcs)
continue
results = results.unionAll(get_proc_features(proc, df, *extr_feature_funcs))
results.show()
转换链是这样的:
为客户 1 获取功能 1 和 2:
+------------+
|time_feature|
+------------+
| 1.6|
+------------+
+-------------+
|speed_feature|
+-------------+
| 2|
+-------------+
将它们合并为:
+------------+--------+-------------+
|time_feature|customid|speed_feature|
+------------+--------+-------------+
| 1.6| CM1| 2|
+------------+--------+-------------+
对客户 2 做同样的事情并将所有 RDD 追加到最终结果 RDD:
+------------+--------+-------------+
|time_feature|customid|speed_feature|
+------------+--------+-------------+
| 1.6| CM1| 2|
| 6.1| CX2| 1|
+------------+--------+-------------+
如果我 运行 集群上的代码,它适用于 2 个客户。
但是当我对合理数量的客户进行测试时,我遇到的主要是 GC 和堆内存错误。
我在这里使用很多 RDD 吗?恐怕我的代码效率很低,但我不知道从哪里开始优化它。我想我只是在最后调用一个动作(我在实时模式下放弃所有 shows() 并且只收集()最后一个 RDD)。
非常感谢您的帮助。
您的代码需要重构,问题不在于 RDD,而在于您过滤它以处理单一键然后交叉连接。遍历值会使您失去 pyspark 的分布式方面。请记住,如果您不需要其他作品的功能,则应始终保留您的作品 table。
最好的方法是使用数据帧和 window 函数。
首先让我们重写您的函数:
import pyspark.sql.functions as psf
def extr_ft_1 (proc_data, w, limit=100):
return proc_data.withColumn(
"speed_feature",
psf.sum((proc_data.speed > limit).cast("int")).over(w)
)
def extr_ft_0(proc_data, w):
return proc_data.withColumn(
"time_feature",
psf.min(proc_data.timestamp).over(w) + psf.max(proc_data.timestamp).over(w)
)
其中 w
是 window 规格:
from pyspark.sql import Window
w = Window.partitionBy("customid")
df1 = extr_ft_1(df, w)
df0 = extr_ft_0(df1, w)
df0.show()
+--------+------+-----+---------+-------------+------------+
|customid|procid|speed|timestamp|speed_feature|time_feature|
+--------+------+-----+---------+-------------+------------+
| CM1| aa1| 100| 0.1| 2| 1.6|
| CM1| aa1| 110| 0.2| 2| 1.6|
| CM1| aa1| 110| 0.9| 2| 1.6|
| CM1| aa1| 100| 1.5| 2| 1.6|
| CX2| bb9| 100| 0.1| 1| 6.1|
| CX2| bb9| 100| 0.2| 1| 6.1|
| CX2| bb9| 110| 6.0| 1| 6.1|
| CX2| bb9| 100| 0.18| 1| 6.1|
+--------+------+-----+---------+-------------+------------+
在这里我们永远不会丢失信息(我们保留所有行)所以如果您想添加额外的功能,您可以。如果您想要最终的汇总结果,只需 运行 通过 groupBy("customid")
.
请注意,您还可以修改 window 规范中的聚合键以包含 procid
,例如。
我有一个客户 table,它为每个客户托管有关多个流程的信息。
目标是为每个客户和每个流程提取特征。这意味着每个特征主要是对 .groupby(customerID, processID)
对象的聚合或排序比较计算。
但是,我们的目标是随着时间的推移能够添加越来越多的功能。所以基本上用户应该能够定义一个带有一些过滤器、指标和聚合的新函数,并将这个新函数添加到一个函数池中,该函数池在 table.
上运行输出应该是一个 customerID,processID table,具有所有特征。
所以我开始一个最小的工作示例:
l = [('CM1','aa1', 100,0.1),('CM1','aa1', 110,0.2),\
('CM1','aa1', 110,0.9),('CM1','aa1', 100,1.5),\
('CX2','bb9', 100,0.1),('CX2','bb9', 100,0.2),\
('CX2','bb9', 110,6.0),('CX2','bb9', 100,0.18)]
rdd = sc.parallelize(l)
df = sqlContext.createDataFrame(rdd,['customid','procid','speed','timestamp'])
+--------+------+-----+---------+
|customid|procid|speed|timestamp|
+--------+------+-----+---------+
| CM1| aa1| 100| 0.1|
| CM1| aa1| 110| 0.2|
| CM1| aa1| 110| 0.9|
| CM1| aa1| 100| 1.5|
| CX2| bb9| 100| 0.1|
| CX2| bb9| 100| 0.2|
| CX2| bb9| 110| 6.0|
| CX2| bb9| 100| 0.18|
+--------+------+-----+---------+
然后我定义了 2 个任意特征,它们被这些函数提取:
def extr_ft_1 (proc_data, limit=100):
proc_data = proc_data.filter(proc_data.speed > limit).agg(count(proc_data.speed))
proc_data = proc_data.select(col('count(speed)').alias('speed_feature'))
proc_data.show()
return proc_data
def extr_ft_0 (proc_data):
max_t = proc_data.agg(spark_max(proc_data.timestamp))
min_t = proc_data.agg(spark_min(proc_data.timestamp))
max_t = max_t.select(col('max(timestamp)').alias('max'))
min_t = min_t.select(col('min(timestamp)').alias('min'))
X = max_t.crossJoin(min_t)
X = X.withColumn('time_feature', X.max+X.min)
X = X.drop(X.min).drop(X.max)
X.show()
return (X)
它们 return 只包含一个聚合值的 1 元素 RRD。 接下来,将所有特征函数应用于给定进程,并在每个进程的结果 RDD 中组合:
def get_proc_features(proc, data, *features):
proc_data = data.filter( data.customid == proc)
features_for_proc = [feature_value(proc_data) for feature_value in features]
for number, feature in enumerate(features_for_proc):
if number == 0:
l = [(proc,'dummy')]
rdd = sc.parallelize(l)
df = sqlContext.createDataFrame(rdd,['customid','dummy'])
df = df.drop(df.dummy)
df.show()
features_for_proc_rdd = feature
features_for_proc_rdd = features_for_proc_rdd.crossJoin(df)
continue
features_for_proc_rdd = features_for_proc_rdd.crossJoin(feature)
features_for_proc_rdd.show()
return features_for_proc_rdd
他们的最后一步是将包含每个进程的特征的所有行附加到一个数据帧:
for number, proc in enumerate(customer_list_1):
if number == 0:
#results = get_trip_features(trip, df, extr_ft_0, extr_ft_1)
results = get_proc_features(proc, df, *extr_feature_funcs)
continue
results = results.unionAll(get_proc_features(proc, df, *extr_feature_funcs))
results.show()
转换链是这样的:
为客户 1 获取功能 1 和 2:
+------------+
|time_feature|
+------------+
| 1.6|
+------------+
+-------------+
|speed_feature|
+-------------+
| 2|
+-------------+
将它们合并为:
+------------+--------+-------------+
|time_feature|customid|speed_feature|
+------------+--------+-------------+
| 1.6| CM1| 2|
+------------+--------+-------------+
对客户 2 做同样的事情并将所有 RDD 追加到最终结果 RDD:
+------------+--------+-------------+
|time_feature|customid|speed_feature|
+------------+--------+-------------+
| 1.6| CM1| 2|
| 6.1| CX2| 1|
+------------+--------+-------------+
如果我 运行 集群上的代码,它适用于 2 个客户。 但是当我对合理数量的客户进行测试时,我遇到的主要是 GC 和堆内存错误。
我在这里使用很多 RDD 吗?恐怕我的代码效率很低,但我不知道从哪里开始优化它。我想我只是在最后调用一个动作(我在实时模式下放弃所有 shows() 并且只收集()最后一个 RDD)。 非常感谢您的帮助。
您的代码需要重构,问题不在于 RDD,而在于您过滤它以处理单一键然后交叉连接。遍历值会使您失去 pyspark 的分布式方面。请记住,如果您不需要其他作品的功能,则应始终保留您的作品 table。
最好的方法是使用数据帧和 window 函数。
首先让我们重写您的函数:
import pyspark.sql.functions as psf
def extr_ft_1 (proc_data, w, limit=100):
return proc_data.withColumn(
"speed_feature",
psf.sum((proc_data.speed > limit).cast("int")).over(w)
)
def extr_ft_0(proc_data, w):
return proc_data.withColumn(
"time_feature",
psf.min(proc_data.timestamp).over(w) + psf.max(proc_data.timestamp).over(w)
)
其中 w
是 window 规格:
from pyspark.sql import Window
w = Window.partitionBy("customid")
df1 = extr_ft_1(df, w)
df0 = extr_ft_0(df1, w)
df0.show()
+--------+------+-----+---------+-------------+------------+
|customid|procid|speed|timestamp|speed_feature|time_feature|
+--------+------+-----+---------+-------------+------------+
| CM1| aa1| 100| 0.1| 2| 1.6|
| CM1| aa1| 110| 0.2| 2| 1.6|
| CM1| aa1| 110| 0.9| 2| 1.6|
| CM1| aa1| 100| 1.5| 2| 1.6|
| CX2| bb9| 100| 0.1| 1| 6.1|
| CX2| bb9| 100| 0.2| 1| 6.1|
| CX2| bb9| 110| 6.0| 1| 6.1|
| CX2| bb9| 100| 0.18| 1| 6.1|
+--------+------+-----+---------+-------------+------------+
在这里我们永远不会丢失信息(我们保留所有行)所以如果您想添加额外的功能,您可以。如果您想要最终的汇总结果,只需 运行 通过 groupBy("customid")
.
请注意,您还可以修改 window 规范中的聚合键以包含 procid
,例如。