Spark Structured Streaming - 在有状态流处理中使用 Window 操作进行事件处理
Spark Structured Streaming - Event processing with Window operation in stateful stream processing
我是 Spark 结构化流处理的新手,目前正在处理一个用例,其中结构化流应用程序将从 Azure IoT 中心-事件中心获取事件(比如每 20 秒后)。
任务是使用这些事件并实时处理。为此,我在 Spark-Java.
下面编写了 Spark Structured streaming program
以下是要点
- 目前我已经用了10分钟应用了window操作
间隔和 5 分钟滑动 window。
- 水印以 10 分钟的间隔应用于 eventDate 参数。
- 目前我没有执行任何其他操作,只是将其以 Parquet 格式存储在指定位置。
- 程序正在将一个事件存储在一个文件中。
问题:
- 是否可以将多个事件以镶木地板格式存储在一个文件中
基于window时间?
- 在这种情况下,window 操作是如何工作的?
- 另外,我想用之前的事件检查事件状态,并根据一些计算(比如 5 分钟内未收到事件)我想更新状态。
...
public class EventSubscriber {
public static void main(String args[]) throws InterruptedException, StreamingQueryException {
String eventHubCompatibleEndpoint = "<My-EVENT HUB END POINT CONNECTION STRING>";
String connString = new ConnectionStringBuilder(eventHubCompatibleEndpoint).build();
EventHubsConf eventHubsConf = new EventHubsConf(connString).setConsumerGroup("$Default")
.setStartingPosition(EventPosition.fromEndOfStream()).setMaxRatePerPartition(100)
.setReceiverTimeout(java.time.Duration.ofMinutes(10));
SparkConf sparkConf = new SparkConf().setMaster("local[2]").setAppName("IoT Spark Streaming");
SparkSession spSession = SparkSession.builder()
.appName("IoT Spark Streaming")
.config(sparkConf).config("spark.sql.streaming.checkpointLocation", "<MY-CHECKPOINT-LOCATION>")
.getOrCreate();
Dataset<Row> inputStreamDF = spSession.readStream()
.format("eventhubs")
.options(eventHubsConf.toMap())
.load();
Dataset<Row> bodyRow = inputStreamDF.withColumn("body", new Column("body").cast(DataTypes.StringType)).select("body");
StructType jsonStruct = new StructType()
.add("eventType", DataTypes.StringType)
.add("payload", DataTypes.StringType);
Dataset<Row> messageRow = bodyRow.map((MapFunction<Row, Row>) value -> {
String valStr = value.getString(0).toString();
String payload = valStr;
Gson gson = new GsonBuilder().serializeNulls().setPrettyPrinting().create();
JsonObject jsonObj = gson.fromJson(valStr, JsonObject.class);
JsonElement methodName = jsonObj.get("method");
String eventType = null;
if(methodName != null) {
eventType = "OTHER_EVENT";
} else {
eventType = "DEVICE_EVENT";
}
Row jsonRow = RowFactory.create(eventType, payload);
return jsonRow;
}, RowEncoder.apply(jsonStruct));
messageRow.printSchema();
Dataset<Row> deviceEventRowDS = messageRow.filter("eventType = 'DEVICE_EVENT'");
deviceEventRowDS.printSchema();
Dataset<DeviceEvent> deviceEventDS = deviceEventRowDS.map((MapFunction<Row, DeviceEvent>) value -> {
String jsonString = value.getString(1).toString();
Gson gson = new GsonBuilder().serializeNulls().setPrettyPrinting().create();
DeviceMessage deviceMessage = gson.fromJson(jsonString, DeviceMessage.class);
DeviceEvent deviceEvent = deviceMessage.getDeviceEvent();
return deviceEvent;
}, Encoders.bean(DeviceEvent.class));
deviceEventDS.printSchema();
Dataset<Row> messageDataset = deviceEventDS.select(
functions.col("eventType"),
functions.col("deviceID"),
functions.col("description"),
functions.to_timestamp(functions.col("eventDate"), "yyyy-MM-dd hh:mm:ss").as("eventDate"),
functions.col("deviceModel"),
functions.col("pingRate"))
.select("eventType", "deviceID", "description", "eventDate", "deviceModel", "pingRate");
messageDataset.printSchema();
Dataset<Row> devWindowDataset = messageDataset.withWatermark("eventDate", "10 minutes")
.groupBy(functions.col("deviceID"),
functions.window(
functions.col("eventDate"), "10 minutes", "5 minutes"))
.count();
devWindowDataset.printSchema();
StreamingQuery query = devWindowDataset.writeStream().outputMode("append")
.format("parquet")
.option("truncate", "false")
.option("path", "<MY-PARQUET-FILE-OUTPUT-LOCATION>")
.start();
query.awaitTermination();
}}
...
任何与此相关的帮助或指导都会有用。
感谢和问候,
Avinash Deshmukh
Is it possible to store multiple events in parquet format in a file based on the window time?
是的。
How does the window operation works in this case?
以下代码是Spark Structured Streaming应用的主要部分:
Dataset<Row> devWindowDataset = messageDataset
.withWatermark("eventDate", "10 minutes")
.groupBy(
functions.col("deviceID"),
functions.window(functions.col("eventDate"), "10 minutes", "5 minutes"))
.count();
这表示底层状态存储应该按照 deviceID
和 eventDate
保留状态 10 分钟,额外的 10 minutes
(根据 withWatermark
)迟到的事件。换句话说,一旦事件在流式查询开始后 eventDate
20 分钟,您应该会看到结果。
withWatermark
用于延迟事件,因此即使 groupBy
会产生结果,只有超过水印阈值才会发出结果。
并且每 10 分钟(+ 10 分钟的水印)应用相同的程序并使用 5 分钟的 window 幻灯片。
将带有 window
运算符的 groupBy
视为多列聚合。
Also I would like to check the event state with previous event and based on some calculations (say event is not received by 5 minutes) I want to update the state.
这听起来像是 KeyValueGroupedDataset.flatMapGroupsWithState operator (aka Arbitrary Stateful Streaming Aggregation). Consult Arbitrary Stateful Operations 的用例。
您也可能只想要众多 aggregation standard functions or a user-defined aggregation function (UDAF).
中的一个
我是 Spark 结构化流处理的新手,目前正在处理一个用例,其中结构化流应用程序将从 Azure IoT 中心-事件中心获取事件(比如每 20 秒后)。
任务是使用这些事件并实时处理。为此,我在 Spark-Java.
下面编写了 Spark Structured streaming program以下是要点
- 目前我已经用了10分钟应用了window操作 间隔和 5 分钟滑动 window。
- 水印以 10 分钟的间隔应用于 eventDate 参数。
- 目前我没有执行任何其他操作,只是将其以 Parquet 格式存储在指定位置。
- 程序正在将一个事件存储在一个文件中。
问题:
- 是否可以将多个事件以镶木地板格式存储在一个文件中 基于window时间?
- 在这种情况下,window 操作是如何工作的?
- 另外,我想用之前的事件检查事件状态,并根据一些计算(比如 5 分钟内未收到事件)我想更新状态。
...
public class EventSubscriber {
public static void main(String args[]) throws InterruptedException, StreamingQueryException {
String eventHubCompatibleEndpoint = "<My-EVENT HUB END POINT CONNECTION STRING>";
String connString = new ConnectionStringBuilder(eventHubCompatibleEndpoint).build();
EventHubsConf eventHubsConf = new EventHubsConf(connString).setConsumerGroup("$Default")
.setStartingPosition(EventPosition.fromEndOfStream()).setMaxRatePerPartition(100)
.setReceiverTimeout(java.time.Duration.ofMinutes(10));
SparkConf sparkConf = new SparkConf().setMaster("local[2]").setAppName("IoT Spark Streaming");
SparkSession spSession = SparkSession.builder()
.appName("IoT Spark Streaming")
.config(sparkConf).config("spark.sql.streaming.checkpointLocation", "<MY-CHECKPOINT-LOCATION>")
.getOrCreate();
Dataset<Row> inputStreamDF = spSession.readStream()
.format("eventhubs")
.options(eventHubsConf.toMap())
.load();
Dataset<Row> bodyRow = inputStreamDF.withColumn("body", new Column("body").cast(DataTypes.StringType)).select("body");
StructType jsonStruct = new StructType()
.add("eventType", DataTypes.StringType)
.add("payload", DataTypes.StringType);
Dataset<Row> messageRow = bodyRow.map((MapFunction<Row, Row>) value -> {
String valStr = value.getString(0).toString();
String payload = valStr;
Gson gson = new GsonBuilder().serializeNulls().setPrettyPrinting().create();
JsonObject jsonObj = gson.fromJson(valStr, JsonObject.class);
JsonElement methodName = jsonObj.get("method");
String eventType = null;
if(methodName != null) {
eventType = "OTHER_EVENT";
} else {
eventType = "DEVICE_EVENT";
}
Row jsonRow = RowFactory.create(eventType, payload);
return jsonRow;
}, RowEncoder.apply(jsonStruct));
messageRow.printSchema();
Dataset<Row> deviceEventRowDS = messageRow.filter("eventType = 'DEVICE_EVENT'");
deviceEventRowDS.printSchema();
Dataset<DeviceEvent> deviceEventDS = deviceEventRowDS.map((MapFunction<Row, DeviceEvent>) value -> {
String jsonString = value.getString(1).toString();
Gson gson = new GsonBuilder().serializeNulls().setPrettyPrinting().create();
DeviceMessage deviceMessage = gson.fromJson(jsonString, DeviceMessage.class);
DeviceEvent deviceEvent = deviceMessage.getDeviceEvent();
return deviceEvent;
}, Encoders.bean(DeviceEvent.class));
deviceEventDS.printSchema();
Dataset<Row> messageDataset = deviceEventDS.select(
functions.col("eventType"),
functions.col("deviceID"),
functions.col("description"),
functions.to_timestamp(functions.col("eventDate"), "yyyy-MM-dd hh:mm:ss").as("eventDate"),
functions.col("deviceModel"),
functions.col("pingRate"))
.select("eventType", "deviceID", "description", "eventDate", "deviceModel", "pingRate");
messageDataset.printSchema();
Dataset<Row> devWindowDataset = messageDataset.withWatermark("eventDate", "10 minutes")
.groupBy(functions.col("deviceID"),
functions.window(
functions.col("eventDate"), "10 minutes", "5 minutes"))
.count();
devWindowDataset.printSchema();
StreamingQuery query = devWindowDataset.writeStream().outputMode("append")
.format("parquet")
.option("truncate", "false")
.option("path", "<MY-PARQUET-FILE-OUTPUT-LOCATION>")
.start();
query.awaitTermination();
}}
...
任何与此相关的帮助或指导都会有用。
感谢和问候,
Avinash Deshmukh
Is it possible to store multiple events in parquet format in a file based on the window time?
是的。
How does the window operation works in this case?
以下代码是Spark Structured Streaming应用的主要部分:
Dataset<Row> devWindowDataset = messageDataset
.withWatermark("eventDate", "10 minutes")
.groupBy(
functions.col("deviceID"),
functions.window(functions.col("eventDate"), "10 minutes", "5 minutes"))
.count();
这表示底层状态存储应该按照 deviceID
和 eventDate
保留状态 10 分钟,额外的 10 minutes
(根据 withWatermark
)迟到的事件。换句话说,一旦事件在流式查询开始后 eventDate
20 分钟,您应该会看到结果。
withWatermark
用于延迟事件,因此即使 groupBy
会产生结果,只有超过水印阈值才会发出结果。
并且每 10 分钟(+ 10 分钟的水印)应用相同的程序并使用 5 分钟的 window 幻灯片。
将带有 window
运算符的 groupBy
视为多列聚合。
Also I would like to check the event state with previous event and based on some calculations (say event is not received by 5 minutes) I want to update the state.
这听起来像是 KeyValueGroupedDataset.flatMapGroupsWithState operator (aka Arbitrary Stateful Streaming Aggregation). Consult Arbitrary Stateful Operations 的用例。
您也可能只想要众多 aggregation standard functions or a user-defined aggregation function (UDAF).
中的一个