为什么 Spark 在 DeltaLake 中写入 Null Table

Why Spark writes Null in DeltaLake Table

我的 Java 应用程序使用 Spark Structured Streaming 连接到 套接字服务器 不断获取传感器测量记录(IoT) 包装在一个 RDMessage 对象中,该对象记录协议中控制的消息类型。 当消息到达时,它们将被检查并使用 Encoder<RDMeasurement> measurementEncoder = Encoders.bean(RDMeasurement.class).

转换为数据集

虽然流被正确读取并且 RDMeasurement 对象被正确创建,输出流被设置为 None 或零,具体取决于数据类型。当我更改格式 (.format("console")).

时,我在 DeltaFrame table 或控制台中看到了这一点

我错过了什么?出了什么问题?

请参阅下面 Java 代码中最重要的部分

public final class SocketRDMeasurement {

    public static void main(String[] args) throws Exception {
        SparkSession spark = SparkSession
                .builder()
                .appName("SSSocketRDMeasurement")
                .master("local[*]")
                .getOrCreate();

        Encoder<StringArray> stringArrayEncoder = Encoders.bean(StringArray.class);
        Encoder<RDMessage> messageEncoder = Encoders.bean(RDMessage.class);
        Encoder<RDMeasurement> measurementEncoder = Encoders.bean(RDMeasurement.class);

        Dataset<Row> records = spark
                .readStream()
                .format("socket")
                .option("host", host)
                .option("port", port)
                .load();

        Dataset<String> inputReceived = records.as(Encoders.STRING());

        Dataset<StringArray> input = inputReceived.as(Encoders.STRING())
                .map((MapFunction<String, StringArray>) x ->
                                new StringArray(x),
                        stringArrayEncoder);

        Dataset<RDMessage> messages = input.map(
                (MapFunction<StringArray, RDMessage>) 
                    r -> new RDMessage(r), messageEncoder);

        Dataset<RDMeasurement> measurements = messages
                .map((MapFunction<RDMessage, RDMeasurement>) r ->
                        new RDMeasurement(), measurementEncoder);

        // The code executes without warning or error but despite the 
        // objects being created correctly the output of dataset is
        // is saved with nulls/nan
        StreamingQuery query = measurements.writeStream()
                .outputMode("append")
                .format("delta")
                .option("checkpointLocation",
                    "/opt/data/delta/_checkpoints/ss-socket-rd-measurement")
                .start("/opt/data/delta/ss-socket-rd-measurement");

        query.awaitTermination();
    }
}

public class StringArray implements Serializable {
    private String[] tokens;
    public StringArray(String tokens) {
        this.tokens = tokens.split(",");
    }

    // getters, setters and toString goes here
}

public class RDMeasurement implements Serializable {
    private String dataSourceName = null;
    private double dt = 0.0f;
    private double t0 = 0f;
    private double endTimestamp = 0L;
    private double[] valuesArray;

    public RDMeasurement() { }

    public RDMeasurement(String dataSourceName, double t0, 
        double dt, double endTimestamp, double[] valuesArray) {

        this.dataSourceName = dataSourceName;
        this.t0 = t0;
        this.dt = dt;
        this.endTimestamp = endTimestamp;
        this.valuesArray = valuesArray;
    }

    // getters, setters and toString goes here
}

public class RDMessage implements Serializable {
    String type;
    RDMeasurement rdMeasurement;

    public RDMessage(String type, RDMeasurement rdMeasurement) {
        this.type = type;
        this.rdMeasurement = rdMeasurement;
    }

    public RDMessage(StringArray stringArray) {
        this(stringArray.getTokens()[0] ,
                new RDMeasurement(stringArray.getTokens()[1],
                        Double.parseDouble(stringArray.getTokens()[2]),
                        Double.parseDouble(stringArray.getTokens()[3]),
                        Double.parseDouble(stringArray.getTokens()[4]),
                        toDoubleArray(5, stringArray))
        );
    }

    private static double[] toDoubleArray(int skip, StringArray stringArray) {
        double[] ret = new double[stringArray.getTokens().length - 5];
        for (int i = 0; i < stringArray.getTokens().length - 5; i++) {
            ret[i] = Double.parseDouble(stringArray.getTokens()[i+skip]);
        }
        return ret;
    }

    // getters, setters and toString goes here
}

每行输入遵循以下格式:

V1_start_rd_0,ds_1,1642442598.266,1.0,1642442618.266,1.00,2.00,3.00,4.00,5.00,6.00,7.00,8.00,9.00,10.00,11.00,12.00,13.00,14.00,15.00,16.00,17.00,18.00,19.00,20.00
V1_rd_1,ds_2,1642442619.266,1.0,1642442639.266,1.00,2.00,3.00,4.00,5.00,6.00,7.00,8.00,9.00,10.00,11.00,12.00,13.00,14.00,15.00,16.00,17.00,18.00,19.00,20.00
V1_rd_2,ds_3,1642442640.266,1.0,1642442660.266,1.00,2.00,3.00,4.00,5.00,6.00,7.00,8.00,9.00,10.00,11.00,12.00,13.00,14.00,15.00,16.00,17.00,18.00,19.00,20.00

重构我的 java 代码并添加用于调试的代码段后,我能够识别错误。

查看重构:

StreamingQuery query = dataStreamReader.load()
        .as(Encoders.STRING())
        .map((MapFunction<String, StringArray>) x -> new StringArray(x),
                stringArrayEncoder)
        .map((MapFunction<StringArray, RDMessage>)
                r -> new RDMessage(r), messageEncoder)
        .map((MapFunction<RDMessage, RDMeasurement>) e ->
                e.getRdMeasurement(), measurementEncoder)
        /*
        .map((MapFunction<RDMeasurement, String>) e -> {
            if (e.getDataSourceName() != null) {
                System.out.println("•••> " + e);
            }
            return e.toString();
        }, Encoders.STRING())
        .map((MapFunction<String, RDMeasurement>) s -> new RDMeasurement(s),
                measurementEncoder) 
        */
        .writeStream()
        .outputMode("append")
        .format("console")
        .start();
query.awaitTermination();

上面评论的代码让我确定了问题所在。