多个 Apache Flink windows 验证
Multiple Apache Flink windows validations
我刚刚开始使用 Apache Flink 进行流处理,问题是我收到了 Json 的流,如下所示:
{
token_id: “tok_afgtryuo”,
ip_address: “128.123.45.1“,
device_fingerprint: “abcghift”,
card_hash: “hgtyuigash”,
“bin_number”: “424242”,
“last4”: “4242”,
“name”: “Seu Jorge”
}
并被问及我是否可以满足以下业务规则:
如果在过去 10 秒内此 IP 的令牌数 > 5,则拒绝
如果最后一分钟此 IP 的令牌数 > 15,则拒绝
如果在过去一小时内此 IP 的令牌数量 > 60,则拒绝
我制作了 2 classes,main
class 当我创建一个实例来调用具有不同参数的 Window
函数以避免重复代码时:
Main.java
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//This DataStream Would be Converting the Json to a Token Object
DataStream<Token> baseStream =
env.addSource(new SocketTextStreamFunction("localhost",
9999,
"\n",
1))
.map(new MapTokens());
// 1- First rule Decline if number of tokens > 5 for this IP in last 10 seconds
DataStreamSink<String> response1 = new RuleMaker().getStreamKeyCount(baseStream, "ip", Time.seconds(10),
5, "seconds").print();
//2 -Decline if number of tokens > 15 for this IP in last minute
DataStreamSink<String> response2 = new RuleMaker().getStreamKeyCount(baseStream, "ip", Time.minutes(1),
62, "minutes").print();
//3- Decline if number of tokens > 60 for this IP in last hour
DataStreamSink<String> response3 = new RuleMaker().getStreamKeyCount(baseStream, "ip", Time.hours(1),
60, "Hours").print();
env.execute("Job2");
}
还有另一个 class,我正在执行所有规则逻辑,我正在计算 IP 地址出现的次数,如果它超过允许的次数 window 我正在回复一条包含一些信息的消息:
Rulemaker.java
public class RuleMaker {
public DataStream<String> getStreamKeyCount(DataStream<Token> stream,
String tokenProp,
Time time,
Integer maxPetitions,
String ruleType){
return
stream
.flatMap(new FlatMapFunction<Token, Tuple3<String, Integer, String>>() {
@Override
public void flatMap(Token token, Collector<Tuple3<String, Integer, String>> collector) throws Exception {
String tokenSelection = "";
switch (tokenProp)
{
case "ip":
tokenSelection = token.getIpAddress();
break;
case "device":
tokenSelection = token.getDeviceFingerprint();
break;
case "cardHash":
tokenSelection = token.getCardHash();
break;
}
collector.collect(new Tuple3<>(tokenSelection, 1, token.get_tokenId()));
}
})
.keyBy(0)
.timeWindow(time)
.process(new MyProcessWindowFunction(maxPetitions, ruleType));
}
//Class to process the elements from the window
private class MyProcessWindowFunction extends ProcessWindowFunction<
Tuple3<String, Integer, String>,
String,
Tuple,
TimeWindow
> {
private Integer _maxPetitions;
private String _ruleType;
public MyProcessWindowFunction(Integer maxPetitions, String ruleType) {
this._maxPetitions = maxPetitions;
this._ruleType = ruleType;
}
@Override
public void process(Tuple tuple, Context context, Iterable<Tuple3<String, Integer, String>> iterable, Collector<String> out) throws Exception {
Integer counter = 0;
for (Tuple3<String, Integer, String> element : iterable) {
counter += element.f1++;
if(counter > _maxPetitions){
out.collect("El elemeto ha sido declinado: " + element.f2 + " Num elements: " + counter + " rule type: " + _ruleType + " token: " + element.f0 );
counter = 0;
}
}
}
}
}
到目前为止,我认为这段代码可以正常工作,但我是 Apache Flink 的初学者,如果你能告诉我我尝试使用的方式是否有问题,我将不胜感激这并指出我正确的方向。
非常感谢。
一般方法看起来很不错,虽然我会认为 Table API would be powerful enough to help you (more concise) which supports Json 开箱即用。
如果你想坚持使用 DataStream API,在 getStreamKeyCount
中,围绕 tokenProp
的开关应该通过将密钥提取器传递给 getStreamKeyCount
来替换,只一个添加新规则的地方。
public DataStream<String> getStreamKeyCount(DataStream<Token> stream,
KeySelector<Token, String> keyExtractor,
Time time,
Integer maxPetitions,
String ruleType){
return stream
.map(token -> new Tuple3<>(keyExtractor.getKey(token), 1, token.get_tokenId()))
.keyBy(0)
.timeWindow(time)
.process(new MyProcessWindowFunction(maxPetitions, ruleType));
}
那么调用就变成了
DataStreamSink<String> response2 = ruleMaker.getStreamKeyCount(baseStream,
Token::getIpAddress, Time.minutes(1), 62, "minutes");
我刚刚开始使用 Apache Flink 进行流处理,问题是我收到了 Json 的流,如下所示:
{
token_id: “tok_afgtryuo”,
ip_address: “128.123.45.1“,
device_fingerprint: “abcghift”,
card_hash: “hgtyuigash”,
“bin_number”: “424242”,
“last4”: “4242”,
“name”: “Seu Jorge”
}
并被问及我是否可以满足以下业务规则:
如果在过去 10 秒内此 IP 的令牌数 > 5,则拒绝
如果最后一分钟此 IP 的令牌数 > 15,则拒绝
如果在过去一小时内此 IP 的令牌数量 > 60,则拒绝
我制作了 2 classes,main
class 当我创建一个实例来调用具有不同参数的 Window
函数以避免重复代码时:
Main.java
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//This DataStream Would be Converting the Json to a Token Object
DataStream<Token> baseStream =
env.addSource(new SocketTextStreamFunction("localhost",
9999,
"\n",
1))
.map(new MapTokens());
// 1- First rule Decline if number of tokens > 5 for this IP in last 10 seconds
DataStreamSink<String> response1 = new RuleMaker().getStreamKeyCount(baseStream, "ip", Time.seconds(10),
5, "seconds").print();
//2 -Decline if number of tokens > 15 for this IP in last minute
DataStreamSink<String> response2 = new RuleMaker().getStreamKeyCount(baseStream, "ip", Time.minutes(1),
62, "minutes").print();
//3- Decline if number of tokens > 60 for this IP in last hour
DataStreamSink<String> response3 = new RuleMaker().getStreamKeyCount(baseStream, "ip", Time.hours(1),
60, "Hours").print();
env.execute("Job2");
}
还有另一个 class,我正在执行所有规则逻辑,我正在计算 IP 地址出现的次数,如果它超过允许的次数 window 我正在回复一条包含一些信息的消息:
Rulemaker.java
public class RuleMaker {
public DataStream<String> getStreamKeyCount(DataStream<Token> stream,
String tokenProp,
Time time,
Integer maxPetitions,
String ruleType){
return
stream
.flatMap(new FlatMapFunction<Token, Tuple3<String, Integer, String>>() {
@Override
public void flatMap(Token token, Collector<Tuple3<String, Integer, String>> collector) throws Exception {
String tokenSelection = "";
switch (tokenProp)
{
case "ip":
tokenSelection = token.getIpAddress();
break;
case "device":
tokenSelection = token.getDeviceFingerprint();
break;
case "cardHash":
tokenSelection = token.getCardHash();
break;
}
collector.collect(new Tuple3<>(tokenSelection, 1, token.get_tokenId()));
}
})
.keyBy(0)
.timeWindow(time)
.process(new MyProcessWindowFunction(maxPetitions, ruleType));
}
//Class to process the elements from the window
private class MyProcessWindowFunction extends ProcessWindowFunction<
Tuple3<String, Integer, String>,
String,
Tuple,
TimeWindow
> {
private Integer _maxPetitions;
private String _ruleType;
public MyProcessWindowFunction(Integer maxPetitions, String ruleType) {
this._maxPetitions = maxPetitions;
this._ruleType = ruleType;
}
@Override
public void process(Tuple tuple, Context context, Iterable<Tuple3<String, Integer, String>> iterable, Collector<String> out) throws Exception {
Integer counter = 0;
for (Tuple3<String, Integer, String> element : iterable) {
counter += element.f1++;
if(counter > _maxPetitions){
out.collect("El elemeto ha sido declinado: " + element.f2 + " Num elements: " + counter + " rule type: " + _ruleType + " token: " + element.f0 );
counter = 0;
}
}
}
}
}
到目前为止,我认为这段代码可以正常工作,但我是 Apache Flink 的初学者,如果你能告诉我我尝试使用的方式是否有问题,我将不胜感激这并指出我正确的方向。
非常感谢。
一般方法看起来很不错,虽然我会认为 Table API would be powerful enough to help you (more concise) which supports Json 开箱即用。
如果你想坚持使用 DataStream API,在 getStreamKeyCount
中,围绕 tokenProp
的开关应该通过将密钥提取器传递给 getStreamKeyCount
来替换,只一个添加新规则的地方。
public DataStream<String> getStreamKeyCount(DataStream<Token> stream,
KeySelector<Token, String> keyExtractor,
Time time,
Integer maxPetitions,
String ruleType){
return stream
.map(token -> new Tuple3<>(keyExtractor.getKey(token), 1, token.get_tokenId()))
.keyBy(0)
.timeWindow(time)
.process(new MyProcessWindowFunction(maxPetitions, ruleType));
}
那么调用就变成了
DataStreamSink<String> response2 = ruleMaker.getStreamKeyCount(baseStream,
Token::getIpAddress, Time.minutes(1), 62, "minutes");