Spark 数据集 groupByKey 不起作用 (Java)
Spark Datasets groupByKey doesn't work (Java)
我尝试使用 Dataset 的 groupByKey 方法。我无法弄清楚问题并且找不到任何使用groupByKey的工作示例。
所以让我指出要点,我正在寻找解决方案:
- 我想使用 groupByKey - 有很多使用 groupBy("key").agg(function) 的示例,我知道但不想使用它们(教育目的)
- 我想使用 Java - 许多示例使用 Scala,但我不想使用它。
- 函数最好写成lambda表达式。
这是我所做的:
//Inner class
public static class Bean implements Serializable {
private static final long serialVersionUID = 1L;
private String k;
private int something;
public Bean(String name, int value) {
k = name;
something = value;
}
public String getK() {return k;}
public int getSomething() {return something;}
public void setK(String k) {this.k = k;}
public void setSomething(int something) {this.something = something;}
}
//usage
List<Bean> debugData = new ArrayList<Bean>();
debugData.add(new Bean("Arnold", 18));
debugData.add(new Bean("Bob", 7));
debugData.add(new Bean("Bob", 13));
debugData.add(new Bean("Bob", 15));
debugData.add(new Bean("Alice", 27));
Dataset<Row> df = sqlContext.createDataFrame(debugData, Bean.class);
df.groupByKey(row -> {new Bean(row.getString(0), row.getInt(1));}, Encoders.bean(Bean.class)); //doesn't compile
我得到的错误:
- 不明确的方法调用 - IDE 显示有关 Function1 和 MapFunction 都匹配的警告。
- 无法解析 getString 和 getInt
- 我不能show/print结果
使用 Java 8 λ
df.groupByKey(row -> {
return new Bean(row.getString(0), row.getInt(1));
}, Encoders.bean(Bean.class));
使用MapFunction
df.groupByKey(new MapFunction<Row, Bean>() {
@Override
public Bean call(Row row) throws Exception {
return new Bean(row.getString(0), row.getInt(1));
}
}, Encoders.bean(Bean.class));
出现此错误是因为 groupByKey
有两个重叠的实现。这些方法之一给出 MapFunction
作为第一个参数,第二个给出 Function1
。您的 lambda 代码可以同时转换为它们。所以你应该明确声明你的意图是哪一个。铸造是一个简单的解决方案:
df.groupByKey(row -> (MapFunction<Row, Bean>) new Bean(row.getString(0), row.getInt(1))
, Encoders.bean(Bean.class));
我尝试使用 Dataset 的 groupByKey 方法。我无法弄清楚问题并且找不到任何使用groupByKey的工作示例。
所以让我指出要点,我正在寻找解决方案:
- 我想使用 groupByKey - 有很多使用 groupBy("key").agg(function) 的示例,我知道但不想使用它们(教育目的)
- 我想使用 Java - 许多示例使用 Scala,但我不想使用它。
- 函数最好写成lambda表达式。
这是我所做的:
//Inner class
public static class Bean implements Serializable {
private static final long serialVersionUID = 1L;
private String k;
private int something;
public Bean(String name, int value) {
k = name;
something = value;
}
public String getK() {return k;}
public int getSomething() {return something;}
public void setK(String k) {this.k = k;}
public void setSomething(int something) {this.something = something;}
}
//usage
List<Bean> debugData = new ArrayList<Bean>();
debugData.add(new Bean("Arnold", 18));
debugData.add(new Bean("Bob", 7));
debugData.add(new Bean("Bob", 13));
debugData.add(new Bean("Bob", 15));
debugData.add(new Bean("Alice", 27));
Dataset<Row> df = sqlContext.createDataFrame(debugData, Bean.class);
df.groupByKey(row -> {new Bean(row.getString(0), row.getInt(1));}, Encoders.bean(Bean.class)); //doesn't compile
我得到的错误:
- 不明确的方法调用 - IDE 显示有关 Function1 和 MapFunction 都匹配的警告。
- 无法解析 getString 和 getInt
- 我不能show/print结果
使用 Java 8 λ
df.groupByKey(row -> {
return new Bean(row.getString(0), row.getInt(1));
}, Encoders.bean(Bean.class));
使用MapFunction
df.groupByKey(new MapFunction<Row, Bean>() {
@Override
public Bean call(Row row) throws Exception {
return new Bean(row.getString(0), row.getInt(1));
}
}, Encoders.bean(Bean.class));
出现此错误是因为 groupByKey
有两个重叠的实现。这些方法之一给出 MapFunction
作为第一个参数,第二个给出 Function1
。您的 lambda 代码可以同时转换为它们。所以你应该明确声明你的意图是哪一个。铸造是一个简单的解决方案:
df.groupByKey(row -> (MapFunction<Row, Bean>) new Bean(row.getString(0), row.getInt(1))
, Encoders.bean(Bean.class));