spark RDD中的选择性采样

Selective sampling in spark RDD

我有一个来自记录事件的 RDD 我想对每个类别进行少量采样。

数据如下

|xxx|xxxx|xxxx|type1|xxxx|xxxx
|xxx|xxxx|xxxx|type2|xxxx|xxxx|xxxx|xxxx
|xxx|xxxx|xxxx|type3|xxxx|xxxx|xxxx
|xxx|xxxx|xxxx|type3|xxxx|xxxx|xxxx
|xxx|xxxx|xxxx|type4|xxxx|xxxx|xxxx|xxxx|xxxx
|xxx|xxxx|xxxx|type1|xxxx|xxxx
|xxx|xxxx|xxxx|type6|xxxx

我的尝试

eventlist = ['type1', 'type2'....]
orginalRDD = sc.textfile("/path/to/file/*.gz").map(lambda x: x.split("|"))

samplelist = []
for event in event list:
    eventsample = orginalRDD.filter(lambda x: x[3] == event).take(5).collect()
    samplelist.extend(eventsample)

print samplelist

我有两个问题,
1. 有更好的way/efficient方法根据特定条件收集样本吗?
2. 是否可以收集未分割线而不是分割线?

Python 欢迎提出scala建议!

如果样本不必是随机的,像这样的东西应该可以正常工作:

n = ...  # Number of elements you want to sample
pairs =  orginalRDD.map(lambda x: (x[3], x))

pairs.aggregateByKey(
    [],  # zero values
    lambda acc, x: (acc + [x])[:n],  # Add new value a trim to n elements
    lambda acc1, acc2: (acc1 + acc2)[:n])  # Combine two accumulators and trim

获取随机样本有点困难。一种可能的方法是在聚合之前添加一个随机值并排序:

import os
import random

def add_random(iter):
   seed = int(os.urandom(4).encode('hex'), 16)
   rs = random.Random(seed)
   for x in iter:
       yield (rs.random(), x)

(pairs
    .mapPartitions(add_random)
    .sortByKey()
    .values()
    .aggregateByKey(
        [],
        lambda acc, x: (acc + [x])[:n],
        lambda acc1, acc2: (acc1 + acc2)[:n]))

DataFrame具体解决方案见