多个 Apache Flink windows 验证

Multiple Apache Flink windows validations

我刚刚开始使用 Apache Flink 进行流处理,问题是我收到了 Json 的流,如下所示:

{

  token_id: “tok_afgtryuo”,

  ip_address: “128.123.45.1“,

  device_fingerprint: “abcghift”,

  card_hash: “hgtyuigash”,

  “bin_number”: “424242”,

  “last4”: “4242”,

  “name”: “Seu Jorge”

}

并被问及我是否可以满足以下业务规则:

我制作了 2 classes,main class 当我创建一个实例来调用具有不同参数的 Window 函数以避免重复代码时:

Main.java

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //This DataStream Would be  Converting the Json to a Token Object
        DataStream<Token> baseStream =
                env.addSource(new SocketTextStreamFunction("localhost",
                        9999,
                        "\n",
                        1))
                        .map(new MapTokens());


        // 1- First rule Decline if number of tokens > 5 for this IP in last 10 seconds
       DataStreamSink<String> response1 =  new RuleMaker().getStreamKeyCount(baseStream, "ip", Time.seconds(10),
               5, "seconds").print();

        //2 -Decline if number of tokens > 15 for this IP in last minute
        DataStreamSink<String> response2 = new RuleMaker().getStreamKeyCount(baseStream, "ip", Time.minutes(1),
                62, "minutes").print();

        //3- Decline if number of tokens > 60 for this IP in last hour
        DataStreamSink<String> response3  = new RuleMaker().getStreamKeyCount(baseStream, "ip", Time.hours(1),
                60, "Hours").print();

        env.execute("Job2");
    }

还有另一个 class,我正在执行所有规则逻辑,我正在计算 IP 地址出现的次数,如果它超过允许的次数 window 我正在回复一条包含一些信息的消息:

Rulemaker.java

public class RuleMaker {


    public DataStream<String> getStreamKeyCount(DataStream<Token> stream, 
                                                String tokenProp,
                                                Time time, 
                                                Integer maxPetitions, 
                                                String ruleType){

        return
               stream
                .flatMap(new FlatMapFunction<Token, Tuple3<String, Integer, String>>() {
                    @Override
                    public void flatMap(Token token, Collector<Tuple3<String, Integer, String>> collector) throws Exception {

                         String tokenSelection = "";
                        switch (tokenProp)
                        {
                            case "ip":
                                tokenSelection = token.getIpAddress();
                                break;
                            case "device":
                                tokenSelection = token.getDeviceFingerprint();
                                break;
                            case "cardHash":
                                tokenSelection = token.getCardHash();
                                break;
                        }
                        collector.collect(new Tuple3<>(tokenSelection, 1, token.get_tokenId()));
                    }
                })
                .keyBy(0)
                .timeWindow(time)
                .process(new MyProcessWindowFunction(maxPetitions, ruleType));
    }

    //Class to process the elements from the window
    private class MyProcessWindowFunction extends ProcessWindowFunction<
            Tuple3<String, Integer, String>,
            String,
            Tuple,
            TimeWindow
            > {

        private Integer _maxPetitions;
        private String  _ruleType;


        public MyProcessWindowFunction(Integer maxPetitions, String ruleType) {
            this._maxPetitions = maxPetitions;
            this._ruleType = ruleType;
        }

        @Override
        public void process(Tuple tuple, Context context, Iterable<Tuple3<String, Integer, String>> iterable, Collector<String> out) throws Exception {

            Integer counter = 0;
            for (Tuple3<String, Integer, String> element : iterable) {
                counter += element.f1++;
                if(counter > _maxPetitions){
                    out.collect("El elemeto ha sido declinado: " + element.f2 + " Num elements: " + counter + " rule type: " +  _ruleType + " token: " + element.f0 );
                    counter = 0;
                }
            }
        }
    }
}

到目前为止,我认为这段代码可以正常工作,但我是 Apache Flink 的初学者,如果你能告诉我我尝试使用的方式是否有问题,我将不胜感激这并指出我正确的方向。

非常感谢。

一般方法看起来很不错,虽然我会认为 Table API would be powerful enough to help you (more concise) which supports Json 开箱即用。

如果你想坚持使用 DataStream API,在 getStreamKeyCount 中,围绕 tokenProp 的开关应该通过将密钥提取器传递给 getStreamKeyCount 来替换,只一个添加新规则的地方。

public DataStream<String> getStreamKeyCount(DataStream<Token> stream, 
                                            KeySelector<Token, String> keyExtractor,
                                            Time time, 
                                            Integer maxPetitions, 
                                            String ruleType){

    return stream
         .map(token -> new Tuple3<>(keyExtractor.getKey(token), 1, token.get_tokenId()))
            .keyBy(0)
            .timeWindow(time)
            .process(new MyProcessWindowFunction(maxPetitions, ruleType));
}

那么调用就变成了

DataStreamSink<String> response2 = ruleMaker.getStreamKeyCount(baseStream, 
    Token::getIpAddress, Time.minutes(1), 62, "minutes");