spark RDD中的选择性采样
Selective sampling in spark RDD
我有一个来自记录事件的 RDD 我想对每个类别进行少量采样。
数据如下
|xxx|xxxx|xxxx|type1|xxxx|xxxx
|xxx|xxxx|xxxx|type2|xxxx|xxxx|xxxx|xxxx
|xxx|xxxx|xxxx|type3|xxxx|xxxx|xxxx
|xxx|xxxx|xxxx|type3|xxxx|xxxx|xxxx
|xxx|xxxx|xxxx|type4|xxxx|xxxx|xxxx|xxxx|xxxx
|xxx|xxxx|xxxx|type1|xxxx|xxxx
|xxx|xxxx|xxxx|type6|xxxx
我的尝试
eventlist = ['type1', 'type2'....]
orginalRDD = sc.textfile("/path/to/file/*.gz").map(lambda x: x.split("|"))
samplelist = []
for event in event list:
eventsample = orginalRDD.filter(lambda x: x[3] == event).take(5).collect()
samplelist.extend(eventsample)
print samplelist
我有两个问题,
1. 有更好的way/efficient方法根据特定条件收集样本吗?
2. 是否可以收集未分割线而不是分割线?
Python 欢迎提出scala建议!
如果样本不必是随机的,像这样的东西应该可以正常工作:
n = ... # Number of elements you want to sample
pairs = orginalRDD.map(lambda x: (x[3], x))
pairs.aggregateByKey(
[], # zero values
lambda acc, x: (acc + [x])[:n], # Add new value a trim to n elements
lambda acc1, acc2: (acc1 + acc2)[:n]) # Combine two accumulators and trim
获取随机样本有点困难。一种可能的方法是在聚合之前添加一个随机值并排序:
import os
import random
def add_random(iter):
seed = int(os.urandom(4).encode('hex'), 16)
rs = random.Random(seed)
for x in iter:
yield (rs.random(), x)
(pairs
.mapPartitions(add_random)
.sortByKey()
.values()
.aggregateByKey(
[],
lambda acc, x: (acc + [x])[:n],
lambda acc1, acc2: (acc1 + acc2)[:n]))
DataFrame
具体解决方案见
我有一个来自记录事件的 RDD 我想对每个类别进行少量采样。
数据如下
|xxx|xxxx|xxxx|type1|xxxx|xxxx
|xxx|xxxx|xxxx|type2|xxxx|xxxx|xxxx|xxxx
|xxx|xxxx|xxxx|type3|xxxx|xxxx|xxxx
|xxx|xxxx|xxxx|type3|xxxx|xxxx|xxxx
|xxx|xxxx|xxxx|type4|xxxx|xxxx|xxxx|xxxx|xxxx
|xxx|xxxx|xxxx|type1|xxxx|xxxx
|xxx|xxxx|xxxx|type6|xxxx
我的尝试
eventlist = ['type1', 'type2'....]
orginalRDD = sc.textfile("/path/to/file/*.gz").map(lambda x: x.split("|"))
samplelist = []
for event in event list:
eventsample = orginalRDD.filter(lambda x: x[3] == event).take(5).collect()
samplelist.extend(eventsample)
print samplelist
我有两个问题,
1. 有更好的way/efficient方法根据特定条件收集样本吗?
2. 是否可以收集未分割线而不是分割线?
Python 欢迎提出scala建议!
如果样本不必是随机的,像这样的东西应该可以正常工作:
n = ... # Number of elements you want to sample
pairs = orginalRDD.map(lambda x: (x[3], x))
pairs.aggregateByKey(
[], # zero values
lambda acc, x: (acc + [x])[:n], # Add new value a trim to n elements
lambda acc1, acc2: (acc1 + acc2)[:n]) # Combine two accumulators and trim
获取随机样本有点困难。一种可能的方法是在聚合之前添加一个随机值并排序:
import os
import random
def add_random(iter):
seed = int(os.urandom(4).encode('hex'), 16)
rs = random.Random(seed)
for x in iter:
yield (rs.random(), x)
(pairs
.mapPartitions(add_random)
.sortByKey()
.values()
.aggregateByKey(
[],
lambda acc, x: (acc + [x])[:n],
lambda acc1, acc2: (acc1 + acc2)[:n]))
DataFrame
具体解决方案见