Spark Structured Streaming - 在有状态流处理中使用 Window 操作进行事件处理

Spark Structured Streaming - Event processing with Window operation in stateful stream processing

我是 Spark 结构化流处理的新手,目前正在处理一个用例,其中结构化流应用程序将从 Azure IoT 中心-事件中心获取事件(比如每 20 秒后)。

任务是使用这些事件并实时处理。为此,我在 Spark-Java.

下面编写了 Spark Structured streaming program

以下是要点

  1. 目前我已经用了10分钟应用了window操作 间隔和 5 分钟滑动 window。
  2. 水印以 10 分钟的间隔应用于 eventDate 参数。
  3. 目前我没有执行任何其他操作,只是将其以 Parquet 格式存储在指定位置。
  4. 程序正在将一个事件存储在一个文件中。

问题:

  1. 是否可以将多个事件以镶木地板格式存储在一个文件中 基于window时间?
  2. 在这种情况下,window 操作是如何工作的?
  3. 另外,我想用之前的事件检查事件状态,并根据一些计算(比如 5 分钟内未收到事件)我想更新状态。

...

public class EventSubscriber {

   public static void main(String args[]) throws InterruptedException, StreamingQueryException {

    String eventHubCompatibleEndpoint = "<My-EVENT HUB END POINT CONNECTION STRING>";

    String connString = new ConnectionStringBuilder(eventHubCompatibleEndpoint).build();

    EventHubsConf eventHubsConf = new EventHubsConf(connString).setConsumerGroup("$Default")
            .setStartingPosition(EventPosition.fromEndOfStream()).setMaxRatePerPartition(100)
            .setReceiverTimeout(java.time.Duration.ofMinutes(10));

    SparkConf sparkConf = new SparkConf().setMaster("local[2]").setAppName("IoT Spark Streaming");

    SparkSession spSession = SparkSession.builder()
            .appName("IoT Spark Streaming")
            .config(sparkConf).config("spark.sql.streaming.checkpointLocation", "<MY-CHECKPOINT-LOCATION>")
            .getOrCreate();

    Dataset<Row> inputStreamDF = spSession.readStream()
            .format("eventhubs")
            .options(eventHubsConf.toMap())
            .load();

    Dataset<Row> bodyRow = inputStreamDF.withColumn("body", new Column("body").cast(DataTypes.StringType)).select("body");

    StructType jsonStruct = new StructType()
            .add("eventType", DataTypes.StringType)
            .add("payload", DataTypes.StringType);

    Dataset<Row> messageRow = bodyRow.map((MapFunction<Row, Row>) value -> {
        String valStr = value.getString(0).toString();

        String payload = valStr;

        Gson gson = new GsonBuilder().serializeNulls().setPrettyPrinting().create();

        JsonObject jsonObj = gson.fromJson(valStr, JsonObject.class);

        JsonElement methodName = jsonObj.get("method");

        String eventType = null;
        if(methodName != null) {
            eventType = "OTHER_EVENT";
        } else {
            eventType = "DEVICE_EVENT";
        }

        Row jsonRow = RowFactory.create(eventType, payload);
        return jsonRow;

    }, RowEncoder.apply(jsonStruct));

    messageRow.printSchema();

    Dataset<Row> deviceEventRowDS = messageRow.filter("eventType = 'DEVICE_EVENT'");

    deviceEventRowDS.printSchema();

    Dataset<DeviceEvent> deviceEventDS = deviceEventRowDS.map((MapFunction<Row, DeviceEvent>) value -> {

        String jsonString = value.getString(1).toString();

        Gson gson = new GsonBuilder().serializeNulls().setPrettyPrinting().create();

        DeviceMessage deviceMessage = gson.fromJson(jsonString, DeviceMessage.class);
        DeviceEvent deviceEvent = deviceMessage.getDeviceEvent();
        return deviceEvent;

    }, Encoders.bean(DeviceEvent.class));

    deviceEventDS.printSchema();

    Dataset<Row> messageDataset = deviceEventDS.select(
            functions.col("eventType"), 
            functions.col("deviceID"),
            functions.col("description"),
            functions.to_timestamp(functions.col("eventDate"), "yyyy-MM-dd hh:mm:ss").as("eventDate"),
            functions.col("deviceModel"),
            functions.col("pingRate"))
            .select("eventType", "deviceID", "description", "eventDate", "deviceModel", "pingRate");

    messageDataset.printSchema();

    Dataset<Row> devWindowDataset = messageDataset.withWatermark("eventDate", "10 minutes")
            .groupBy(functions.col("deviceID"),
                    functions.window(
                            functions.col("eventDate"), "10 minutes", "5 minutes"))
            .count();

    devWindowDataset.printSchema();

    StreamingQuery query = devWindowDataset.writeStream().outputMode("append")
            .format("parquet")
            .option("truncate", "false")
            .option("path", "<MY-PARQUET-FILE-OUTPUT-LOCATION>")
            .start();

    query.awaitTermination();
}}

...

任何与此相关的帮助或指导都会有用。

感谢和问候,

Avinash Deshmukh

Is it possible to store multiple events in parquet format in a file based on the window time?

是的。

How does the window operation works in this case?

以下代码是Spark Structured Streaming应用的主要部分:

Dataset<Row> devWindowDataset = messageDataset
  .withWatermark("eventDate", "10 minutes")
  .groupBy(
    functions.col("deviceID"),
    functions.window(functions.col("eventDate"), "10 minutes", "5 minutes"))
  .count();

这表示底层状态存储应该按照 deviceIDeventDate 保留状态 10 分钟,额外的 10 minutes (根据 withWatermark)迟到的事件。换句话说,一旦事件在流式查询开始后 eventDate 20 分钟,您应该会看到结果。

withWatermark 用于延迟事件,因此即使 groupBy 会产生结果,只有超过水印阈值才会发出结果。

并且每 10 分钟(+ 10 分钟的水印)应用相同的程序并使用 5 分钟的 window 幻灯片。

将带有 window 运算符的 groupBy 视为多列聚合。

Also I would like to check the event state with previous event and based on some calculations (say event is not received by 5 minutes) I want to update the state.

这听起来像是 KeyValueGroupedDataset.flatMapGroupsWithState operator (aka Arbitrary Stateful Streaming Aggregation). Consult Arbitrary Stateful Operations 的用例。

您也可能只想要众多 aggregation standard functions or a user-defined aggregation function (UDAF).

中的一个