在一次操作中使用 spark 通过 reduceByKey 查找值范围
find range of values with reduceByKey using spark in one operation
我正在尝试使用 pyspark 使我的 reduceByKey 函数的输出成为相对于键传递的整数范围。
我试着做一个自定义函数:
def _range(x,y):
return [max(x,y), min(x,y)]
data2 = data_.map(lambda x: (x[u'driverId'] + ',' + x[u'afh'], int(x['timestamp'])))
.reduceByKey(lambda x,y: _range(x,y))
当然,输出结果是列表中的列表中的列表
我知道一个解决方案是
.reduceByKey(max)
其次是
.reduceByKey(min)
^^^^然后合并,但我不想执行两个操作
但我想一次完成,这样应用程序的效率就不会低了。我还想避免首先填充整数列表。
有任何想法吗?数据在 RDD 中。
谢谢
这里正确的做法是combineByKey
定义如下:
def seq_op(acc, x):
return (min(x, acc[0]), max(x, acc[1]))
def comb_op(acc1, acc2):
return (min(acc1[0], acc2[0]), max(acc1[1], acc2[1]))
(pairs
.aggregateByKey((sys.float_info.max, sys.float_info.min), seq_op, comb_op)
.mapValues(lambda minmax: abs(minmax[0] - minmax[1])))
其中 pairs
是以下结果:
pairs = data_.map(
lambda x: (x[u'driverId'] + ',' + x[u'afh'], int(x['timestamp']
)
由于密钥是动态生成的,因此您无法避免初始 map
,因为任何 *byKey
操作都应该预先知道密钥。可以在 combineByKey
内部执行值类型转换,但从根本上讲,它不会影响必须访问数据的次数。
我正在尝试使用 pyspark 使我的 reduceByKey 函数的输出成为相对于键传递的整数范围。
我试着做一个自定义函数:
def _range(x,y):
return [max(x,y), min(x,y)]
data2 = data_.map(lambda x: (x[u'driverId'] + ',' + x[u'afh'], int(x['timestamp'])))
.reduceByKey(lambda x,y: _range(x,y))
当然,输出结果是列表中的列表中的列表
我知道一个解决方案是
.reduceByKey(max)
其次是
.reduceByKey(min)
^^^^然后合并,但我不想执行两个操作
但我想一次完成,这样应用程序的效率就不会低了。我还想避免首先填充整数列表。 有任何想法吗?数据在 RDD 中。 谢谢
这里正确的做法是combineByKey
定义如下:
def seq_op(acc, x):
return (min(x, acc[0]), max(x, acc[1]))
def comb_op(acc1, acc2):
return (min(acc1[0], acc2[0]), max(acc1[1], acc2[1]))
(pairs
.aggregateByKey((sys.float_info.max, sys.float_info.min), seq_op, comb_op)
.mapValues(lambda minmax: abs(minmax[0] - minmax[1])))
其中 pairs
是以下结果:
pairs = data_.map(
lambda x: (x[u'driverId'] + ',' + x[u'afh'], int(x['timestamp']
)
由于密钥是动态生成的,因此您无法避免初始 map
,因为任何 *byKey
操作都应该预先知道密钥。可以在 combineByKey
内部执行值类型转换,但从根本上讲,它不会影响必须访问数据的次数。