如何按多个字段对 JavaRDD<Row> 进行排序,并将特定数据保留在 Java Spark 中
How to sort JavaRDD<Row> by multiple fields and just reserve the particular data in Java Spark
我有一个类型为 JavaRDD<Row>
的输入数据。
Row
有两个字段。
[
{"fieldName":"requestId", "fieldType":"String"},
{"fieldName":"price", "fieldType":"double"}
]
requestId
和 price
可以在许多 Rows
中重复。我的目的是从那些 requestId
的 Rows
中保留最大 price
的 Row
。实际上,即使不使用排序,任何方法都可以。
比如输入是这样的:
76044601-8029-4e09-9708-41dd125ae4bb 1676.304091136485
76044601-8029-4e09-9708-41dd125ae4bb 3898.9987591932413
ad0acb4a-100d-4624-b863-fcf275ce28db 7518.603722172683
76044601-8029-4e09-9708-41dd125ae4bb 3308.4421575701463
26f639bc-2041-435c-86da-73b997c0cc64 1737.7186292370193
beeb7fc1-2a2d-4943-8237-c281ee7c9617 4941.882928279789
26f639bc-2041-435c-86da-73b997c0cc64 1710.328581775302
输出数据应该是这样的(输出顺序没有问题):
76044601-8029-4e09-9708-41dd125ae4bb 3898.9987591932413
ad0acb4a-100d-4624-b863-fcf275ce28db 7518.603722172683
26f639bc-2041-435c-86da-73b997c0cc64 1737.7186292370193
beeb7fc1-2a2d-4943-8237-c281ee7c9617 4941.882928279789
候选方法:
JavaRDD<Row> javaRDD = dataFrame.toJavaRDD().mapToPair(new PairFunction<Row, String, Row>() {
@Override
public Tuple2<String, Row> call(Row row) {
String key = String.valueOf(row.getAs("requestid"));
return new Tuple2<String, Row>(key, row);
}
}).reduceByKey(new Function2<Row, Row, Row>() {
@Override
public Row call(Row row1, Row row2) throws Exception {
double rs1 = Double.parseDouble(String.valueOf(row1.getAs("price")));
double rs2 = Double.parseDouble(String.valueOf(row2.getAs("price")));
if (rs1 < rs2) {
return row2;
} else {
return row1;
}
}
}).map(new Function<Tuple2<String, Row>, Row>() {
@Override
public Row call(Tuple2<String, Row> tuple) {
return tuple._2;
}
});
首先,您必须将原始数据制作成JavaRDD对象。
并配合mapToPair功能,将数据格式设为key-value类型。(key : requestId, value: price)
并使用reduceByKey函数,选择最高价格作为key的值。
那么JavaRDD就是你想要的结果。
您应该使用 groupByKey,而不是 reduceByKey,然后对 groupby 结果进行排序。
有一个简单的方法可以做到这一点。
只需使用groupBy
然后max
,您将得到结果而无需解析为JavaRDD
。
df.groupBy("requestId").max("price").show();
测试
输入:
{"requestId": "1", "price": 10}
{"requestId": "1", "price": 15}
{"requestId": "1", "price": 19}
{"requestId": "2", "price": 20}
{"requestId": "2", "price": 21}
{"requestId": "2", "price": 26}
{"requestId": "3", "price": 30}
{"requestId": "3", "price": 38}
我有:
+---------+----------+
|requestId|max(price)|
+---------+----------+
| 1| 19|
| 2| 26|
| 3| 38|
+---------+----------+
我有一个类型为 JavaRDD<Row>
的输入数据。
Row
有两个字段。
[
{"fieldName":"requestId", "fieldType":"String"},
{"fieldName":"price", "fieldType":"double"}
]
requestId
和 price
可以在许多 Rows
中重复。我的目的是从那些 requestId
的 Rows
中保留最大 price
的 Row
。实际上,即使不使用排序,任何方法都可以。
比如输入是这样的:
76044601-8029-4e09-9708-41dd125ae4bb 1676.304091136485
76044601-8029-4e09-9708-41dd125ae4bb 3898.9987591932413
ad0acb4a-100d-4624-b863-fcf275ce28db 7518.603722172683
76044601-8029-4e09-9708-41dd125ae4bb 3308.4421575701463
26f639bc-2041-435c-86da-73b997c0cc64 1737.7186292370193
beeb7fc1-2a2d-4943-8237-c281ee7c9617 4941.882928279789
26f639bc-2041-435c-86da-73b997c0cc64 1710.328581775302
输出数据应该是这样的(输出顺序没有问题):
76044601-8029-4e09-9708-41dd125ae4bb 3898.9987591932413
ad0acb4a-100d-4624-b863-fcf275ce28db 7518.603722172683
26f639bc-2041-435c-86da-73b997c0cc64 1737.7186292370193
beeb7fc1-2a2d-4943-8237-c281ee7c9617 4941.882928279789
候选方法:
JavaRDD<Row> javaRDD = dataFrame.toJavaRDD().mapToPair(new PairFunction<Row, String, Row>() {
@Override
public Tuple2<String, Row> call(Row row) {
String key = String.valueOf(row.getAs("requestid"));
return new Tuple2<String, Row>(key, row);
}
}).reduceByKey(new Function2<Row, Row, Row>() {
@Override
public Row call(Row row1, Row row2) throws Exception {
double rs1 = Double.parseDouble(String.valueOf(row1.getAs("price")));
double rs2 = Double.parseDouble(String.valueOf(row2.getAs("price")));
if (rs1 < rs2) {
return row2;
} else {
return row1;
}
}
}).map(new Function<Tuple2<String, Row>, Row>() {
@Override
public Row call(Tuple2<String, Row> tuple) {
return tuple._2;
}
});
首先,您必须将原始数据制作成JavaRDD对象。
并配合mapToPair功能,将数据格式设为key-value类型。(key : requestId, value: price)
并使用reduceByKey函数,选择最高价格作为key的值。
那么JavaRDD就是你想要的结果。
您应该使用 groupByKey,而不是 reduceByKey,然后对 groupby 结果进行排序。
有一个简单的方法可以做到这一点。
只需使用groupBy
然后max
,您将得到结果而无需解析为JavaRDD
。
df.groupBy("requestId").max("price").show();
测试
输入:
{"requestId": "1", "price": 10}
{"requestId": "1", "price": 15}
{"requestId": "1", "price": 19}
{"requestId": "2", "price": 20}
{"requestId": "2", "price": 21}
{"requestId": "2", "price": 26}
{"requestId": "3", "price": 30}
{"requestId": "3", "price": 38}
我有:
+---------+----------+
|requestId|max(price)|
+---------+----------+
| 1| 19|
| 2| 26|
| 3| 38|
+---------+----------+