如何在 Java 中对 javaPairRDD 使用 aggregateByKey?

how to use aggregateByKey on javaPairRDD in Java?

我搜索了很多,但没有在 java 代码中找到执行 aggregateByKey 的示例。

我想查找 JavaPairRDD 中的行数,按键减少。

我读到 aggregateByKey 是最好的方法,但我使用 Java 而不是 scala,我无法在 Java.

中使用它

请帮忙!!!

例如:

input: [(key1,[name:abc,email:def,address:ghi]),(key1,[name:abc,email:def,address:ghi]),(key2,[name:abc,email:def,address:ghi])]

output: [(key1,[name:abc,email:def,address:ghi, count:2]),(key2,[name:abc,email:def,address:ghi, count:1])]

我想做的和我的例子完全一样,我想在输出行中添加一个额外的列,减少行数。

谢谢!!!

我不确定你想做什么,但我可以提供一个解决方案来提供你需要的输出。 AggregateByKey 不会执行您期望的操作,它只是 RDD 的一种组合方式,而在 DataFrame 上,它的作用类似于您的期望。无论如何,下面的代码可以为您提供所需的输出。

JavaPairRDD<String, Iterable<String>> groups = pairs.groupByKey();

JavaPairRDD<Integer, String> counts = groups.mapToPair(new PairFunction<Tuple2<String, Iterable<String>>, Integer, String>(){

            public Tuple2<Integer, String> call(Tuple2<String, Iterable<String>> arg0) throws Exception {
                HashMap<String, Integer> counts = new HashMap<String, Integer>();
                Iterator<String> itr = arg0._2.iterator();
                String val = null;
                while(itr.hasNext()){
                    val = itr.next();
                    if(counts.get(val) == null){
                        counts.put(val, 1);
                    }else{
                        counts.put(val, counts.get(val)+1);
                    }
                }

                return new Tuple2(arg0._1, counts.toString());
            }

        });

你可以试试让我知道。请注意,坦率地说,这不是组合,因为组合不会做这种事情。

这是我在 java.

中如何按键聚合的示例
JavaPairRDD<String, Row> result = inputDataFrame.javaRDD().mapToPair(new  PairFunction<Row, String, Row>() {
    private static final long serialVersionUID = 1L;
    public Tuple2<String, Row> call(Row tblRow) throws Exception {
        String strID= CommonConstant.BLANKSTRING;
        Object[] newRow = new Object[schemaSize];
        for(String s: matchKey)
        {
            if(tblRow.apply(finalSchema.get(s))!=null){
                strID+= tblRow.apply(finalSchema.get(s)).toString().trim().toLowerCase();
            }                           
        }   
        int rowSize=    tblRow.length();
        for (int itr = 0; itr < rowSize; itr++)
        {
            if(tblRow.apply(itr)!=null)
            {
                newRow[itr] = tblRow.apply(itr);
            }
        }
        newRow[idIndex]= Utils.generateKey(strID);
        return new Tuple2<String, Row>(strID,RowFactory.create(newRow));
    }
}).aggregateByKey(RowFactory.create(arr), new Function2<Row,Row,Row>(){

    private static final long serialVersionUID = 1L;

    public Row call(Row argRow1, Row argRow2) throws Exception {
        // TODO Auto-generated method stub

        Integer rowThreshold=   dataSchemaHashMap.get(CommonConstant.STR_TEMPThreshold);
        Object[] newRow = new Object[schemaSize];
        int rowSize=    argRow1.length();

        for (int itr = 0; itr < rowSize; itr++)
        {
            if(argRow1!=null && argRow2!=null)
            {
                if(argRow1.apply(itr)!=null && argRow2.apply(itr)!=null)
                {
                    if(itr==rowSize-1){
                        newRow[itr] = Integer.parseInt(argRow1.apply(itr).toString())+Integer.parseInt(argRow2.apply(itr).toString());
                    }else{
                        newRow[itr] = argRow2.apply(itr);
                    }
                }
            }
        }

        return RowFactory.create(newRow);

    }

}, new Function2<Row,Row,Row>(){
    private static final long serialVersionUID = 1L;

    public Row call(Row v1, Row v2) throws Exception {
        // TODO Auto-generated method stub
        return v1;
    }
});

JavaRDD<Row> result1 = result.map(new Function<Tuple2<String,Row>, Row>() {
    private static final long serialVersionUID = -5480405270683046298L;
    public Row call(Tuple2<String, Row> rddRow) throws Exception {
        return rddRow._2();
    }
});

数据file:average.txt

student_Name、主题、分数

ss,英文,80

ss,数学,60

GG,英文,180

PP,英文,80

PI,英文,80

GG,数学,100

PP,数学,810

PI,数学,800

问题是在 java 8.

中使用 aggregateByKey spark 转换找到主题明智的平均值

这是一种方法:

    JavaRDD<String> baseRDD = jsc.textFile("average.txt");
    JavaPairRDD<String,Integer> studentRDD = baseRDD.mapToPair( s -> new Tuple2<String,Integer>(s.split(",")[1],Integer.parseInt(s.split(",")[2])));
    JavaPairRDD<String,Avg> avgRDD = studentRDD.aggregateByKey(new Avg(0,0), (v,x) -> new Avg(v.getSum()+x,v.getNum()+1), (v1,v2) -> new Avg(v1.getSum()+v2.getSum(),v1.getNum()+v2.getNum()));

    Map<String,Avg> mapAvg = avgRDD.collectAsMap();

    for(Entry<String,Avg> entry : mapAvg.entrySet()){
        System.out.println(entry.getKey()+"::"+entry.getValue().getAvg());
    }



import java.io.Serializable;

public class Avg implements Serializable{

private static final long serialVersionUID = 1L;

private int sum;
private int num;

public Avg(int sum, int num){
    this.sum = sum;
    this.num = num;
}

public double getAvg(){ return (this.sum / this.num);}

public int getSum(){    return this.sum;    }

public int getNum(){        return this.num;    }

}