并行处理 Flink CEP 中的多个模式
Processing multiple patterns in Flink CEP in Parallel
我有以下案例
有 2 个虚拟机正在向 Kafka 发送流,CEP 引擎正在接收这些流,当单个流满足特定条件时会生成警告。
目前,CEP 正在为两个患者检查两个流的相同条件(当心率 > 65 和呼吸率 > 68 时)并并行发出警报,如下所示
// detecting pattern
Pattern<joinEvent, ? > pattern = Pattern.<joinEvent>begin("start")
.subtype(joinEvent.class).where(new FilterFunction<joinEvent>() {
@Override
public boolean filter(joinEvent joinEvent) throws Exception {
return joinEvent.getHeartRate() > 65 ;
}
})
.subtype(joinEvent.class)
.where(new FilterFunction<joinEvent>() {
@Override
public boolean filter(joinEvent joinEvent) throws Exception {
return joinEvent.getRespirationRate() > 68;
}
}).within(Time.milliseconds(100));
但我想对两个流使用不同的条件。例如,如果
,我想发出警报
For patient A : if heart rate > 65 and Respiration Rate > 68
For patient B : if heart rate > 75 and Respiration Rate > 78
我该如何实现?我是否需要在同一环境中创建多个流环境或多个模式。
根据您的要求,您可以根据需要创建 2 种不同的图案以实现清晰的分隔。
如果您想使用相同的模式执行此操作,那么也可以。为此,请阅读一个 kafka 来源中的所有 kafka 主题:
FlinkKafkaConsumer010<JoinEvent> kafkaSource = new FlinkKafkaConsumer010<>(
Arrays.asList("topic1", "topic2"),
new StringSerializerToEvent(),
props);
这里我假设两个主题的事件结构相同,并且您有患者姓名以及已传输的事件的一部分。
一旦你这样做了,它就变得容易了,因为你只需要用 "Or" 创建一个模式,如下所示:
Pattern.<JoinEvent>begin("first")
.where(new SimpleCondition<JoinEvent>() {
@Override
public boolean filter(JoinEvent event) throws Exception {
return event.getPatientName().equals("A") && event.getHeartRate() > 65 && joinEvent.getRespirationRate() > 68;
}
})
.or(new SimpleCondition<JoinEvent>() {
@Override
public boolean filter(JoinEvent event) throws Exception {
return event.getPatientName().equals("B") && event.getHeartRate() > 75 && joinEvent.getRespirationRate() > 78;
}
});
只要您的条件匹配,就会产生匹配。虽然,我不太确定“.within(Time.milliseconds(100))”在您的示例中实现了什么。
我有以下案例
有 2 个虚拟机正在向 Kafka 发送流,CEP 引擎正在接收这些流,当单个流满足特定条件时会生成警告。
目前,CEP 正在为两个患者检查两个流的相同条件(当心率 > 65 和呼吸率 > 68 时)并并行发出警报,如下所示
// detecting pattern
Pattern<joinEvent, ? > pattern = Pattern.<joinEvent>begin("start")
.subtype(joinEvent.class).where(new FilterFunction<joinEvent>() {
@Override
public boolean filter(joinEvent joinEvent) throws Exception {
return joinEvent.getHeartRate() > 65 ;
}
})
.subtype(joinEvent.class)
.where(new FilterFunction<joinEvent>() {
@Override
public boolean filter(joinEvent joinEvent) throws Exception {
return joinEvent.getRespirationRate() > 68;
}
}).within(Time.milliseconds(100));
但我想对两个流使用不同的条件。例如,如果
,我想发出警报For patient A : if heart rate > 65 and Respiration Rate > 68
For patient B : if heart rate > 75 and Respiration Rate > 78
我该如何实现?我是否需要在同一环境中创建多个流环境或多个模式。
根据您的要求,您可以根据需要创建 2 种不同的图案以实现清晰的分隔。
如果您想使用相同的模式执行此操作,那么也可以。为此,请阅读一个 kafka 来源中的所有 kafka 主题:
FlinkKafkaConsumer010<JoinEvent> kafkaSource = new FlinkKafkaConsumer010<>(
Arrays.asList("topic1", "topic2"),
new StringSerializerToEvent(),
props);
这里我假设两个主题的事件结构相同,并且您有患者姓名以及已传输的事件的一部分。
一旦你这样做了,它就变得容易了,因为你只需要用 "Or" 创建一个模式,如下所示:
Pattern.<JoinEvent>begin("first")
.where(new SimpleCondition<JoinEvent>() {
@Override
public boolean filter(JoinEvent event) throws Exception {
return event.getPatientName().equals("A") && event.getHeartRate() > 65 && joinEvent.getRespirationRate() > 68;
}
})
.or(new SimpleCondition<JoinEvent>() {
@Override
public boolean filter(JoinEvent event) throws Exception {
return event.getPatientName().equals("B") && event.getHeartRate() > 75 && joinEvent.getRespirationRate() > 78;
}
});
只要您的条件匹配,就会产生匹配。虽然,我不太确定“.within(Time.milliseconds(100))”在您的示例中实现了什么。