通过 Python 使用 Spark 准备我的大数据
Prepare my bigdata with Spark via Python
我的100m尺寸,量化数据:
(1424411938', [3885, 7898])
(3333333333', [3885, 7898])
想要的结果:
(3885, [3333333333, 1424411938])
(7898, [3333333333, 1424411938])
所以我想要的是转换数据,以便我将 3885(例如)与所有拥有它的 data[0]
分组)。这是我在 python:
中所做的
def prepare(data):
result = []
for point_id, cluster in data:
for index, c in enumerate(cluster):
found = 0
for res in result:
if c == res[0]:
found = 1
if(found == 0):
result.append((c, []))
for res in result:
if c == res[0]:
res[1].append(point_id)
return result
但是当我mapPartitions()
用prepare()
编辑data
RDD时,它似乎只在当前分区中做我想做的,因此return一个更大的结果超出预期。
例如,如果开头的第一条记录在第一个分区中,第二条记录在第二个分区中,那么我会得到结果:
(3885, [3333333333])
(7898, [3333333333])
(3885, [1424411938])
(7898, [1424411938])
如何修改我的 prepare()
以获得预期的效果?或者,如何处理 prepare()
产生的结果,以便我可以获得想要的结果?
您可能已经从代码中注意到,我根本不关心速度。
这是一种创建数据的方法:
data = []
from random import randint
for i in xrange(0, 10):
data.append((randint(0, 100000000), (randint(0, 16000), randint(0, 16000))))
data = sc.parallelize(data)
您可以使用一组基本的 pyspark 转换来实现此目的。
>>> rdd = sc.parallelize([(1424411938, [3885, 7898]),(3333333333, [3885, 7898])])
>>> r = rdd.flatMap(lambda x: ((a,x[0]) for a in x[1]))
我们使用flatMap
为x[1]
中的每个项目设置一个键值对,我们将数据行格式更改为(a, x[0])
,这里的a
是x[1]
中的每一项。要更好地理解 flatMap
,您可以查看文档。
>>> r2 = r.groupByKey().map(lambda x: (x[0],tuple(x[1])))
我们只是将所有键值对按键分组,并使用元组函数将可迭代对象转换为元组。
>>> r2.collect()
[(3885, (1424411938, 3333333333)), (7898, (1424411938, 3333333333))]
正如您所说,您可以使用 [:150] 来获得前 150 个元素,我想这将是正确的用法:
r2 = r.groupByKey().map(lambda x: (x[0],tuple(x[1])[:150]))
我尽量解释清楚。希望对您有所帮助。
我的100m尺寸,量化数据:
(1424411938', [3885, 7898])
(3333333333', [3885, 7898])
想要的结果:
(3885, [3333333333, 1424411938])
(7898, [3333333333, 1424411938])
所以我想要的是转换数据,以便我将 3885(例如)与所有拥有它的 data[0]
分组)。这是我在 python:
def prepare(data):
result = []
for point_id, cluster in data:
for index, c in enumerate(cluster):
found = 0
for res in result:
if c == res[0]:
found = 1
if(found == 0):
result.append((c, []))
for res in result:
if c == res[0]:
res[1].append(point_id)
return result
但是当我mapPartitions()
用prepare()
编辑data
RDD时,它似乎只在当前分区中做我想做的,因此return一个更大的结果超出预期。
例如,如果开头的第一条记录在第一个分区中,第二条记录在第二个分区中,那么我会得到结果:
(3885, [3333333333])
(7898, [3333333333])
(3885, [1424411938])
(7898, [1424411938])
如何修改我的 prepare()
以获得预期的效果?或者,如何处理 prepare()
产生的结果,以便我可以获得想要的结果?
您可能已经从代码中注意到,我根本不关心速度。
这是一种创建数据的方法:
data = []
from random import randint
for i in xrange(0, 10):
data.append((randint(0, 100000000), (randint(0, 16000), randint(0, 16000))))
data = sc.parallelize(data)
您可以使用一组基本的 pyspark 转换来实现此目的。
>>> rdd = sc.parallelize([(1424411938, [3885, 7898]),(3333333333, [3885, 7898])])
>>> r = rdd.flatMap(lambda x: ((a,x[0]) for a in x[1]))
我们使用flatMap
为x[1]
中的每个项目设置一个键值对,我们将数据行格式更改为(a, x[0])
,这里的a
是x[1]
中的每一项。要更好地理解 flatMap
,您可以查看文档。
>>> r2 = r.groupByKey().map(lambda x: (x[0],tuple(x[1])))
我们只是将所有键值对按键分组,并使用元组函数将可迭代对象转换为元组。
>>> r2.collect()
[(3885, (1424411938, 3333333333)), (7898, (1424411938, 3333333333))]
正如您所说,您可以使用 [:150] 来获得前 150 个元素,我想这将是正确的用法:
r2 = r.groupByKey().map(lambda x: (x[0],tuple(x[1])[:150]))
我尽量解释清楚。希望对您有所帮助。