在 Spark SQL 中对具有任意行的数据集使用地图
Using map on dataset with arbitrary rows in Spark SQL
我正在尝试在任意数据集上使用 Dataframe 地图函数。但是我不明白您将如何从 Row-> Row 进行映射。 spark sql 文档中没有给出任意数据的示例:
Dataset<Row> original_data = ...
Dataset<Row> changed_data = original_data.map(new MapFunction<Row,Row>{
@Override
public Row call(Row row) throws Exception {
Row newRow = RowFactory.create(obj1,obj2);
return newRow;
}
}, Encoders.bean(Row.class));
但是这不起作用,因为需要某种编码器?
如何映射到通用行?
如果 obj1 和 obj2 不是原始类型,则将它们的模式表示为 StructType 以创建行编码器。我建议不要使用 Row 类型,而是创建存储 obj1 和 obj2 的自定义 bean,然后在 map
转换中使用该自定义 bean 编码器。
行类型:
StructType customStructType = new StructType();
customStructType = customStructType.add("obj1", DataTypes.< type>, false);
customStructType = customStructType.add("obj2", DataTypes.< type >, false);
ExpressionEncoder<Row> customTypeEncoder = null;
Dataset<Row> changed_data = original_data.map(row->{
return RowFactory.create(obj1,obj2);;
}, RowEncoder.apply(customStructType));
自定义 Bean 类型:
class CustomBean implements ....{
Object obj1;
Object obj2;
....
}
Dataset<CustomBean> changed_data = original_data.map(row->{
return new CustomBean(obj1,obj2);
}, Encoders.bean(CustomBean));
我正在尝试在任意数据集上使用 Dataframe 地图函数。但是我不明白您将如何从 Row-> Row 进行映射。 spark sql 文档中没有给出任意数据的示例:
Dataset<Row> original_data = ...
Dataset<Row> changed_data = original_data.map(new MapFunction<Row,Row>{
@Override
public Row call(Row row) throws Exception {
Row newRow = RowFactory.create(obj1,obj2);
return newRow;
}
}, Encoders.bean(Row.class));
但是这不起作用,因为需要某种编码器? 如何映射到通用行?
如果 obj1 和 obj2 不是原始类型,则将它们的模式表示为 StructType 以创建行编码器。我建议不要使用 Row 类型,而是创建存储 obj1 和 obj2 的自定义 bean,然后在 map
转换中使用该自定义 bean 编码器。
行类型:
StructType customStructType = new StructType();
customStructType = customStructType.add("obj1", DataTypes.< type>, false);
customStructType = customStructType.add("obj2", DataTypes.< type >, false);
ExpressionEncoder<Row> customTypeEncoder = null;
Dataset<Row> changed_data = original_data.map(row->{
return RowFactory.create(obj1,obj2);;
}, RowEncoder.apply(customStructType));
自定义 Bean 类型:
class CustomBean implements ....{
Object obj1;
Object obj2;
....
}
Dataset<CustomBean> changed_data = original_data.map(row->{
return new CustomBean(obj1,obj2);
}, Encoders.bean(CustomBean));