并行处理 Flink CEP 中的多个模式

Processing multiple patterns in Flink CEP in Parallel

我有以下案例

有 2 个虚拟机正在向 Kafka 发送流,CEP 引擎正在接收这些流,当单个流满足特定条件时会生成警告。

目前,CEP 正在为两个患者检查两个流的相同条件(当心率 > 65 和呼吸率 > 68 时)并并行发出警报,如下所示

 // detecting pattern
        Pattern<joinEvent, ? > pattern = Pattern.<joinEvent>begin("start")
                .subtype(joinEvent.class).where(new FilterFunction<joinEvent>() {
                    @Override
                    public boolean filter(joinEvent joinEvent) throws Exception {
                        return joinEvent.getHeartRate() > 65 ;
                    }
                })
                .subtype(joinEvent.class)
                .where(new FilterFunction<joinEvent>() {
                    @Override
                    public boolean filter(joinEvent joinEvent) throws Exception {
                        return joinEvent.getRespirationRate() > 68;
                    }
                }).within(Time.milliseconds(100));

但我想对两个流使用不同的条件。例如,如果

,我想发出警报
For patient A : if heart rate > 65 and Respiration Rate > 68
For patient B : if heart rate > 75 and Respiration Rate > 78

我该如何实现?我是否需要在同一环境中创建多个流环境或多个模式。

根据您的要求,您可以根据需要创建 2 种不同的图案以实现清晰的分隔。

如果您想使用相同的模式执行此操作,那么也可以。为此,请阅读一个 kafka 来源中的所有 kafka 主题:

    FlinkKafkaConsumer010<JoinEvent> kafkaSource = new FlinkKafkaConsumer010<>(
        Arrays.asList("topic1", "topic2"),
        new StringSerializerToEvent(),
        props);

这里我假设两个主题的事件结构相同,并且您有患者姓名以及已传输的事件的一部分。

一旦你这样做了,它就变得容易了,因为你只需要用 "Or" 创建一个模式,如下所示:

    Pattern.<JoinEvent>begin("first")
        .where(new SimpleCondition<JoinEvent>() {

          @Override
          public boolean filter(JoinEvent event) throws Exception {
            return event.getPatientName().equals("A") && event.getHeartRate() > 65 && joinEvent.getRespirationRate() > 68;
          }
        })
        .or(new SimpleCondition<JoinEvent>() {

          @Override
          public boolean filter(JoinEvent event) throws Exception {
            return event.getPatientName().equals("B") && event.getHeartRate() > 75 && joinEvent.getRespirationRate() > 78;
          }
        });

只要您的条件匹配,就会产生匹配。虽然,我不太确定“.within(Time.milliseconds(100))”在您的示例中实现了什么。