在 Spark SQL 中对具有任意行的数据集使用地图

Using map on dataset with arbitrary rows in Spark SQL

我正在尝试在任意数据集上使用 Dataframe 地图函数。但是我不明白您将如何从 Row-> Row 进行映射。 spark sql 文档中没有给出任意数据的示例:

Dataset<Row> original_data = ...
Dataset<Row> changed_data = original_data.map(new MapFunction<Row,Row>{
            @Override
            public Row call(Row row) throws Exception {
                Row newRow = RowFactory.create(obj1,obj2);
                return newRow;
            }
}, Encoders.bean(Row.class));

但是这不起作用,因为需要某种编码器? 如何映射到通用行?

如果 obj1 和 obj2 不是原始类型,则将它们的模式表示为 StructType 以创建行编码器。我建议不要使用 Row 类型,而是创建存储 obj1 和 obj2 的自定义 bean,然后在 map 转换中使用该自定义 bean 编码器。

行类型:

StructType customStructType = new StructType();
        customStructType = customStructType.add("obj1", DataTypes.< type>, false);
        customStructType = customStructType.add("obj2", DataTypes.< type >, false);
        ExpressionEncoder<Row> customTypeEncoder = null;

        Dataset<Row> changed_data = original_data.map(row->{
            return RowFactory.create(obj1,obj2);;
    }, RowEncoder.apply(customStructType));

自定义 Bean 类型:

class CustomBean implements ....{
    Object obj1;
    Object obj2;
....
}

Dataset<CustomBean> changed_data = original_data.map(row->{
                return new CustomBean(obj1,obj2);
        }, Encoders.bean(CustomBean));