为什么可以将 PatternStream 的相同事件同时发送到 PatternSelectFunction 和 PatternTimeoutFunction?
Why can the same events of a PatternStream be sent to both the PatternSelectFunction and the PatternTimeoutFunction?
我必须在给定时间内在具有相同 correlationId 的 3 个 kafka 源流中收集 3 个事件,并且如果这些事件迟到,则能够收集所有或部分这些事件。
我在 3 DataStream 和 CEP 模式上使用了联合。但我注意到 与模式匹配良好并因此在 select 函数中收集的事件也会在达到超时后在超时函数中发送 .
我不知道我的例子做错了什么,或者我没有理解什么,但我期待正匹配的事件也不会超时。
我的印象是存储了不相交的时间快照。
我使用的是1.3.0 Flink版本
感谢您的帮助。
控制台输出,我们可以看到 3 个相关事件中的 2 个已 selected 并超时:
匹配事件:
密钥---0b3c116e-0703-43cb-8b3e-54b0b5e93948
密钥---f969dd4d-47ff-445c-9182-0f95a569febb
密钥---2ecbb89d-1463-4669-a657-555f73b6fb1d
超时事件:
第一次调用超时函数:
密钥---f969dd4d-47ff-445c-9182-0f95a569febb
钥匙---0b3c116e-0703-43cb-8b3e-54b0b5e93948
第二次调用:
密钥---f969dd4d-47ff-445c-9182-0f95a569febb
11:01:44,677 INFO com.bnpp.pe.cep.Main - Matching events:
11:01:44,678 INFO com.bnpp.pe.cep.Main - SctRequestProcessStep2Event(super=SctRequestEvent(correlationId=cId---a14a4e23-56c5-4242-9c43-d465d2b84454, key=Key---0b3c116e-0703-43cb-8b3e-54b0b5e93948, debtorIban=BE42063929068055, creditorIban=BE42063929068056, amount=100.0, communication=test), succeeded=false)
11:01:44,678 INFO com.bnpp.pe.cep.Main - SctRequestProcessStep1Event(super=SctRequestEvent(correlationId=cId---a14a4e23-56c5-4242-9c43-d465d2b84454, key=Key---2ecbb89d-1463-4669-a657-555f73b6fb1d, debtorIban=BE42063929068055, creditorIban=BE42063929068056, amount=100.0, communication=test), succeeded=false)
11:01:44,678 INFO com.bnpp.pe.cep.Main - SctRequestProcessStep3Event(super=SctRequestEvent(correlationId=cId---a14a4e23-56c5-4242-9c43-d465d2b84454, key=Key---f969dd4d-47ff-445c-9182-0f95a569febb, debtorIban=BE42063929068055, creditorIban=BE42063929068056, amount=100.0, communication=test), succeeded=false)
Right(SctRequestFinalEvent(super=SctRequestEvent(correlationId=cId---a14a4e23-56c5-4242-9c43-d465d2b84454, key=Key---2196fdb0-01e8-4cc6-af4b-04bcf9dc67a2, debtorIban=null, creditorIban=null, amount=null, communication=null), state=SUCCESS))
11:01:49,635 INFO com.bnpp.pe.cep.Main - Timed out events:
11:01:49,636 INFO com.bnpp.pe.cep.Main - SctRequestProcessStep3Event(super=SctRequestEvent(correlationId=cId---a14a4e23-56c5-4242-9c43-d465d2b84454, key=Key---f969dd4d-47ff-445c-9182-0f95a569febb, debtorIban=BE42063929068055, creditorIban=BE42063929068056, amount=100.0, communication=test), succeeded=false)
11:01:49,636 INFO com.bnpp.pe.cep.Main - SctRequestProcessStep2Event(super=SctRequestEvent(correlationId=cId---a14a4e23-56c5-4242-9c43-d465d2b84454, key=Key---0b3c116e-0703-43cb-8b3e-54b0b5e93948, debtorIban=BE42063929068055, creditorIban=BE42063929068056, amount=100.0, communication=test), succeeded=false)
11:01:49,636 INFO com.bnpp.pe.cep.Main - Timed out events:
11:01:49,636 INFO com.bnpp.pe.cep.Main - SctRequestProcessStep3Event(super=SctRequestEvent(correlationId=cId---a14a4e23-56c5-4242-9c43-d465d2b84454, key=Key---f969dd4d-47ff-445c-9182-0f95a569febb, debtorIban=BE42063929068055, creditorIban=BE42063929068056, amount=100.0, communication=test), succeeded=false)
Left(SctRequestFinalEvent(super=SctRequestEvent(correlationId=cId---a14a4e23-56c5-4242-9c43-d465d2b84454, key=Key---aa437bcf-ecaa-4561-9f4e-08a902f0e248, debtorIban=null, creditorIban=null, amount=null, communication=null), state=FAILED))
Left(SctRequestFinalEvent(super=SctRequestEvent(correlationId=cId---a14a4e23-56c5-4242-9c43-d465d2b84454, key=Key---5420eb41-2723-42ac-83fd-d203d6bf2526, debtorIban=null, creditorIban=null, amount=null, communication=null), state=FAILED))
我的测试代码:
package com.bnpp.pe.cep;
import com.bnpp.pe.event.Event;
import com.bnpp.pe.event.SctRequestFinalEvent;
import com.bnpp.pe.util.EventHelper;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.PatternTimeoutFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.Properties;
/**
* Created by Laurent Bauchau on 2/08/2017.
*/
@Slf4j
public class Main implements Serializable {
public static void main(String... args) {
new Main();
}
public static final String step1Topic = "sctinst-step1";
public static final String step2Topic = "sctinst-step2";
public static final String step3Topic = "sctinst-step3";
private static final String PATTERN_NAME = "the_3_correlated_events_pattern";
private final FlinkKafkaConsumer010<Event> kafkaSource1;
private final DeserializationSchema<Event> deserializationSchema1;
private final FlinkKafkaConsumer010<Event> kafkaSource2;
private final DeserializationSchema<Event> deserializationSchema2;
private final FlinkKafkaConsumer010<Event> kafkaSource3;
private final DeserializationSchema<Event> deserializationSchema3;
private Main() {
// Kafka init
Properties kafkaProperties = new Properties();
kafkaProperties.setProperty("bootstrap.servers", "localhost:9092");
kafkaProperties.setProperty("zookeeper.connect", "localhost:2180");
kafkaProperties.setProperty("group.id", "sct-validation-cgroup1");
deserializationSchema1 = new SctRequestProcessStep1EventDeserializer();
kafkaSource1 = new FlinkKafkaConsumer010<>(step1Topic, deserializationSchema1, kafkaProperties);
deserializationSchema2 = new SctRequestProcessStep2EventDeserializer();
kafkaSource2 = new FlinkKafkaConsumer010<>(step2Topic, deserializationSchema2, kafkaProperties);
deserializationSchema3 = new SctRequestProcessStep3EventDeserializer();
kafkaSource3 = new FlinkKafkaConsumer010<>(step3Topic, deserializationSchema3, kafkaProperties);
try {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
DataStream<Event> s1 = env.addSource(kafkaSource1);
DataStream<Event> s2 = env.addSource(kafkaSource2);
DataStream<Event> s3 = env.addSource(kafkaSource3);
DataStream<Event> unionStream = s1.union(s2, s3);
Pattern successPattern = Pattern.<Event>begin(PATTERN_NAME)
.times(3)
.within(Time.seconds(5));
PatternStream<Event> matchingStream = CEP.pattern(
unionStream.keyBy(new CIDKeySelector()),
successPattern);
matchingStream.select(new MyPatternTimeoutFunction(), new MyPatternSelectFunction())
.print()
.setParallelism(1);
env.execute();
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
private static class MyPatternTimeoutFunction implements PatternTimeoutFunction<Event, SctRequestFinalEvent> {
@Override
public SctRequestFinalEvent timeout(Map<String, List<Event>> pattern, long timeoutTimestamp) throws Exception {
List<Event> events = pattern.get(PATTERN_NAME);
log.info("Timed out events:");
events.forEach(e -> log.info(e.toString()));
// Resulting event creation
SctRequestFinalEvent event = new SctRequestFinalEvent();
EventHelper.correlate(events.get(0), event);
EventHelper.injectKey(event);
event.setState(SctRequestFinalEvent.State.FAILED);
return event;
}
}
private static class MyPatternSelectFunction
implements PatternSelectFunction<Event, SctRequestFinalEvent> {
@Override
public SctRequestFinalEvent select(Map<String, List<Event>> pattern) throws Exception {
List<Event> events = pattern.get(PATTERN_NAME);
log.info("Matching events:");
events.forEach(e -> log.info(e.toString()));
// Resulting event creation
SctRequestFinalEvent event = new SctRequestFinalEvent();
EventHelper.correlate(events.get(0), event);
EventHelper.injectKey(event);
event.setState(SctRequestFinalEvent.State.SUCCESS);
return event;
}
}
private static class CIDKeySelector implements KeySelector<Event, String> {
@Override
public String getKey(Event event) throws Exception {
return event.getCorrelationId();
}
}
}
让我们分析一下您的模式说明了什么。您传递的模式如下:
Pattern.<Event>begin(PATTERN_NAME)
.times(3)
.within(Time.seconds(5));
它确实说,搜索 5 秒内发生的三个任意事件的序列。现在,flink 开始在每个后续事件中搜索新的匹配项(正在进行引入新的工作 MatchingBehaviours
请参阅 FLINK-7169)。
所以为了展示简单的例子。如果你在 5 秒内有一个像 A B C D E
这样的序列。 CEP 库将 return 结果:
- A B C
- B C D
- C D E
还有两个超时:
- D E
- D
你的程序....
在您的程序中 select 文本 by time ,因此您将 PatterStream 对象传递给 BOTH Function.No 需要 select 字符串的时间...您不需要 PatternTimeOutFunction()。
看这里,没有时间因素。
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.util.Map;
public class FlinkCEP {
public static void main(String[] args) throws Exception {
// set up the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = env.socketTextStream("localhost", 1111)
.flatMap(new LineTokenizer());
text.print();
Pattern<String, String> pattern =
Pattern.<String>begin("start").where(txt -> txt.equals("a"))
.next("middle").where(txt -> txt.equals("b"))
.followedBy("end").where(txt -> txt.equals("c")).within(Time.seconds(1));
PatternStream<String> patternStream = CEP.pattern(text, pattern);
DataStream<String> alerts = patternStream.select(new PatternSelectFunction<String, String>() {
@Override
public String select(Map<String, String> matches) throws Exception {
return "Found: " +
matches.get("start") + "->" +
matches.get("middle") + "->" +
matches.get("end");
}
});
// emit result
alerts.print();
// execute program
env.execute("WordCount Example");
}
}
我必须在给定时间内在具有相同 correlationId 的 3 个 kafka 源流中收集 3 个事件,并且如果这些事件迟到,则能够收集所有或部分这些事件。
我在 3 DataStream 和 CEP 模式上使用了联合。但我注意到 与模式匹配良好并因此在 select 函数中收集的事件也会在达到超时后在超时函数中发送 .
我不知道我的例子做错了什么,或者我没有理解什么,但我期待正匹配的事件也不会超时。
我的印象是存储了不相交的时间快照。
我使用的是1.3.0 Flink版本
感谢您的帮助。
控制台输出,我们可以看到 3 个相关事件中的 2 个已 selected 并超时:
匹配事件:
密钥---0b3c116e-0703-43cb-8b3e-54b0b5e93948
密钥---f969dd4d-47ff-445c-9182-0f95a569febb
密钥---2ecbb89d-1463-4669-a657-555f73b6fb1d
超时事件:
第一次调用超时函数:
密钥---f969dd4d-47ff-445c-9182-0f95a569febb
钥匙---0b3c116e-0703-43cb-8b3e-54b0b5e93948
第二次调用:
密钥---f969dd4d-47ff-445c-9182-0f95a569febb
11:01:44,677 INFO com.bnpp.pe.cep.Main - Matching events:
11:01:44,678 INFO com.bnpp.pe.cep.Main - SctRequestProcessStep2Event(super=SctRequestEvent(correlationId=cId---a14a4e23-56c5-4242-9c43-d465d2b84454, key=Key---0b3c116e-0703-43cb-8b3e-54b0b5e93948, debtorIban=BE42063929068055, creditorIban=BE42063929068056, amount=100.0, communication=test), succeeded=false)
11:01:44,678 INFO com.bnpp.pe.cep.Main - SctRequestProcessStep1Event(super=SctRequestEvent(correlationId=cId---a14a4e23-56c5-4242-9c43-d465d2b84454, key=Key---2ecbb89d-1463-4669-a657-555f73b6fb1d, debtorIban=BE42063929068055, creditorIban=BE42063929068056, amount=100.0, communication=test), succeeded=false)
11:01:44,678 INFO com.bnpp.pe.cep.Main - SctRequestProcessStep3Event(super=SctRequestEvent(correlationId=cId---a14a4e23-56c5-4242-9c43-d465d2b84454, key=Key---f969dd4d-47ff-445c-9182-0f95a569febb, debtorIban=BE42063929068055, creditorIban=BE42063929068056, amount=100.0, communication=test), succeeded=false)
Right(SctRequestFinalEvent(super=SctRequestEvent(correlationId=cId---a14a4e23-56c5-4242-9c43-d465d2b84454, key=Key---2196fdb0-01e8-4cc6-af4b-04bcf9dc67a2, debtorIban=null, creditorIban=null, amount=null, communication=null), state=SUCCESS))
11:01:49,635 INFO com.bnpp.pe.cep.Main - Timed out events:
11:01:49,636 INFO com.bnpp.pe.cep.Main - SctRequestProcessStep3Event(super=SctRequestEvent(correlationId=cId---a14a4e23-56c5-4242-9c43-d465d2b84454, key=Key---f969dd4d-47ff-445c-9182-0f95a569febb, debtorIban=BE42063929068055, creditorIban=BE42063929068056, amount=100.0, communication=test), succeeded=false)
11:01:49,636 INFO com.bnpp.pe.cep.Main - SctRequestProcessStep2Event(super=SctRequestEvent(correlationId=cId---a14a4e23-56c5-4242-9c43-d465d2b84454, key=Key---0b3c116e-0703-43cb-8b3e-54b0b5e93948, debtorIban=BE42063929068055, creditorIban=BE42063929068056, amount=100.0, communication=test), succeeded=false)
11:01:49,636 INFO com.bnpp.pe.cep.Main - Timed out events:
11:01:49,636 INFO com.bnpp.pe.cep.Main - SctRequestProcessStep3Event(super=SctRequestEvent(correlationId=cId---a14a4e23-56c5-4242-9c43-d465d2b84454, key=Key---f969dd4d-47ff-445c-9182-0f95a569febb, debtorIban=BE42063929068055, creditorIban=BE42063929068056, amount=100.0, communication=test), succeeded=false)
Left(SctRequestFinalEvent(super=SctRequestEvent(correlationId=cId---a14a4e23-56c5-4242-9c43-d465d2b84454, key=Key---aa437bcf-ecaa-4561-9f4e-08a902f0e248, debtorIban=null, creditorIban=null, amount=null, communication=null), state=FAILED))
Left(SctRequestFinalEvent(super=SctRequestEvent(correlationId=cId---a14a4e23-56c5-4242-9c43-d465d2b84454, key=Key---5420eb41-2723-42ac-83fd-d203d6bf2526, debtorIban=null, creditorIban=null, amount=null, communication=null), state=FAILED))
我的测试代码:
package com.bnpp.pe.cep;
import com.bnpp.pe.event.Event;
import com.bnpp.pe.event.SctRequestFinalEvent;
import com.bnpp.pe.util.EventHelper;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.PatternTimeoutFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.Properties;
/**
* Created by Laurent Bauchau on 2/08/2017.
*/
@Slf4j
public class Main implements Serializable {
public static void main(String... args) {
new Main();
}
public static final String step1Topic = "sctinst-step1";
public static final String step2Topic = "sctinst-step2";
public static final String step3Topic = "sctinst-step3";
private static final String PATTERN_NAME = "the_3_correlated_events_pattern";
private final FlinkKafkaConsumer010<Event> kafkaSource1;
private final DeserializationSchema<Event> deserializationSchema1;
private final FlinkKafkaConsumer010<Event> kafkaSource2;
private final DeserializationSchema<Event> deserializationSchema2;
private final FlinkKafkaConsumer010<Event> kafkaSource3;
private final DeserializationSchema<Event> deserializationSchema3;
private Main() {
// Kafka init
Properties kafkaProperties = new Properties();
kafkaProperties.setProperty("bootstrap.servers", "localhost:9092");
kafkaProperties.setProperty("zookeeper.connect", "localhost:2180");
kafkaProperties.setProperty("group.id", "sct-validation-cgroup1");
deserializationSchema1 = new SctRequestProcessStep1EventDeserializer();
kafkaSource1 = new FlinkKafkaConsumer010<>(step1Topic, deserializationSchema1, kafkaProperties);
deserializationSchema2 = new SctRequestProcessStep2EventDeserializer();
kafkaSource2 = new FlinkKafkaConsumer010<>(step2Topic, deserializationSchema2, kafkaProperties);
deserializationSchema3 = new SctRequestProcessStep3EventDeserializer();
kafkaSource3 = new FlinkKafkaConsumer010<>(step3Topic, deserializationSchema3, kafkaProperties);
try {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
DataStream<Event> s1 = env.addSource(kafkaSource1);
DataStream<Event> s2 = env.addSource(kafkaSource2);
DataStream<Event> s3 = env.addSource(kafkaSource3);
DataStream<Event> unionStream = s1.union(s2, s3);
Pattern successPattern = Pattern.<Event>begin(PATTERN_NAME)
.times(3)
.within(Time.seconds(5));
PatternStream<Event> matchingStream = CEP.pattern(
unionStream.keyBy(new CIDKeySelector()),
successPattern);
matchingStream.select(new MyPatternTimeoutFunction(), new MyPatternSelectFunction())
.print()
.setParallelism(1);
env.execute();
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
private static class MyPatternTimeoutFunction implements PatternTimeoutFunction<Event, SctRequestFinalEvent> {
@Override
public SctRequestFinalEvent timeout(Map<String, List<Event>> pattern, long timeoutTimestamp) throws Exception {
List<Event> events = pattern.get(PATTERN_NAME);
log.info("Timed out events:");
events.forEach(e -> log.info(e.toString()));
// Resulting event creation
SctRequestFinalEvent event = new SctRequestFinalEvent();
EventHelper.correlate(events.get(0), event);
EventHelper.injectKey(event);
event.setState(SctRequestFinalEvent.State.FAILED);
return event;
}
}
private static class MyPatternSelectFunction
implements PatternSelectFunction<Event, SctRequestFinalEvent> {
@Override
public SctRequestFinalEvent select(Map<String, List<Event>> pattern) throws Exception {
List<Event> events = pattern.get(PATTERN_NAME);
log.info("Matching events:");
events.forEach(e -> log.info(e.toString()));
// Resulting event creation
SctRequestFinalEvent event = new SctRequestFinalEvent();
EventHelper.correlate(events.get(0), event);
EventHelper.injectKey(event);
event.setState(SctRequestFinalEvent.State.SUCCESS);
return event;
}
}
private static class CIDKeySelector implements KeySelector<Event, String> {
@Override
public String getKey(Event event) throws Exception {
return event.getCorrelationId();
}
}
}
让我们分析一下您的模式说明了什么。您传递的模式如下:
Pattern.<Event>begin(PATTERN_NAME)
.times(3)
.within(Time.seconds(5));
它确实说,搜索 5 秒内发生的三个任意事件的序列。现在,flink 开始在每个后续事件中搜索新的匹配项(正在进行引入新的工作 MatchingBehaviours
请参阅 FLINK-7169)。
所以为了展示简单的例子。如果你在 5 秒内有一个像 A B C D E
这样的序列。 CEP 库将 return 结果:
- A B C
- B C D
- C D E
还有两个超时:
- D E
- D
你的程序....
在您的程序中 select 文本 by time ,因此您将 PatterStream 对象传递给 BOTH Function.No 需要 select 字符串的时间...您不需要 PatternTimeOutFunction()。
看这里,没有时间因素。
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.util.Map;
public class FlinkCEP {
public static void main(String[] args) throws Exception {
// set up the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = env.socketTextStream("localhost", 1111)
.flatMap(new LineTokenizer());
text.print();
Pattern<String, String> pattern =
Pattern.<String>begin("start").where(txt -> txt.equals("a"))
.next("middle").where(txt -> txt.equals("b"))
.followedBy("end").where(txt -> txt.equals("c")).within(Time.seconds(1));
PatternStream<String> patternStream = CEP.pattern(text, pattern);
DataStream<String> alerts = patternStream.select(new PatternSelectFunction<String, String>() {
@Override
public String select(Map<String, String> matches) throws Exception {
return "Found: " +
matches.get("start") + "->" +
matches.get("middle") + "->" +
matches.get("end");
}
});
// emit result
alerts.print();
// execute program
env.execute("WordCount Example");
}
}