RDD 获取每个元素的类型和索引

RDD get type and index of each element

我正在寻找获取 Java 中每个元素的类型和索引的方法。例如,假设有一个 RDD

['0,1,hi,1.0', '2,3,String,String2', '1.0,2.0,3,String']

那我要

[(0, int),(1, int),(2, String),(3, Double),(0, int) ........]

这样我就可以按键归约并查看每列的数据类型。我在 python 中实现了它,但不确定如何在 Java 中做到这一点。有什么办法吗?这是我在 python

中的做法
def infer_type(partition):
for row in partition:
    value = ""
    idx = 0
    for i in range(len(row)):
        if row[0] == self.prop.comment:
            break
        if row[i] == self.prop.delimiter or i == (len(row) - 1):
            if i == len(row) - 1:
                value += str(row[i])
            if bool(value.strip()) == False:
                yield (idx, 'None')
            elif int_regex_match.match(value):
                yield (idx, 'int')
            elif float_regex_match.match(value):
                yield (idx, 'float')
            else:
                if date_regex_match.match(value):
                    yield (idx, 'date')
                else:
                    yield (idx, 'str')
                idx += 1
                value = ""
        else:
            value += str(row[i])
    rdd = rdd.mapPartitions(infer_type).map(lambda x: ((x[0], x[1]), 1)).reduceByKey(add).map(
    lambda x: (x[0][0], (x[0][1], x[1])))

编辑:这就是我现在所做的。但是,我无法获得元组的迭代器。

PairFlatMapFunction map = new PairFlatMapFunction<Iterator<String>, Integer, String>(){

        @Override
        public Iterator<Tuple2<Integer, String>> call(Iterator<String> iterator) throws Exception {
            // TODO Auto-generated method stub
            while(iterator.hasNext()) {
                String[] row = iterator.next().split(",");
                for(int j = 0; j<row.length;j++) {
                    if(row[j].matches(int_regex)) {
                        Tuple2<Integer, String> result =new Tuple2(j, "int");
                        // return iterator of result..?
                    }else if(row[j].matches(float_regex)) {
                        Tuple2<Integer, String> result =new Tuple2(j, "float");
                        // return iterator of result..?
                    }else if(row[j].matches(date_regex_match)) {
                        Tuple2<Integer, String> result =new Tuple2(j, "date");
                        // return iterator of result..?
                    }else {
                        Tuple2<Integer, String> result =new Tuple2(j, "str");
                        // return iterator of result..?
                    }
                }
            }
        }
 };
JavaPairRDD pair_rdd = rdd.mapPartitionsToPair(map, false);

根据您表达的需要,我不明白您为什么使用mapPartition而不是简单的map。这里的另一个错误是您应该使用 flatMapToPair 而不是 mapToPair。

为了实现您想要的效果,您的 flatmap 函数需要将字符串(例如“0,1,hi,1.0”)映射到元组迭代器。为此,您只需创建一个计算结果的 ArrayList 即可:

@Override
public Iterator<Tuple2<Integer, String>> call(String row) throws Exception {
    String[] split_row = row.split(",");
    //create list
    List<Tuple2<Integer, String>> result = new ArrayList<>()
    for(int j = 0; j<split_row.length;j++) {
        if(split_row[j].matches(int_regex)) {
            result.add(new Tuple2(j, "int"));
        } //else ...
    }
    //return the iterator
    return result.iterator();
}

如果您确实需要使用 mapPartition,您可以将相同的逻辑应用于您的函数。