Flink 使用 .keyBy 和 KeySelector 从 Kafka 获取数据

Flink use of .keyBy with KeySelector taking data from Kafka

我在 java 中有 Flink 脚本和 Kafka 连接器。我从 Kafka 获取数据没有问题,第一步我正在做一个 .map 从消息中获取时间戳。为了使用事件时间 windows,我从数据中提取了以毫秒为单位的时间戳,并将其 return 用于 flink。为此,我使用了“assignTimestampsAndWatermarks”

 DataStream<String> kafkaData = env.addSource(new FlinkKafkaConsumer("CorID_0", new SimpleStringSchema(), p));

    kafkaData.map(new MapFunction<
            String, Tuple19<String, String, String, String, String,
            String, Double, Long, Double, Long,
            Long, Integer, Long, Double, Long,
            Double, Double, Integer, Double>>()
    {
        public Tuple19<String, String, String, String, String,
                String, Double, Long, Double, Long,
                Long, Integer, Long, Double, Long,
                Double, Double, Integer, Double> map(String value)
        {
            String[] words = value.split(",");
            return new Tuple19<String, String, String, String, String,
                    String, Double, Long, Double, Long,
                    Long, Integer, Long, Double, Long,
                    Double, Double, Integer, Double>
                    (words[0], words[1], words[2], words[3], words[4], words[5], Double.parseDouble(words[6]),
                            Long.parseLong(words[7]), Double.parseDouble(words[8]), Long.parseLong(words[9]),
                            Long.parseLong(words[10]), Integer.parseInt(words[11]),
                            Long.parseLong(words[12]), Double.parseDouble(words[13]),
                            Long.parseLong(words[14]), Double.parseDouble(words[15]),
                            Double.parseDouble(words[16]), Integer.parseInt(words[17]),
                            Double.parseDouble(words[18]));
        }
    })

            .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple19<String, String, String, String, String,
                    String, Double, Long, Double, Long,
                    Long, Integer, Long, Double, Long,
                    Double, Double, Integer, Double>>()
            {
                private final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
                
                public long extractAscendingTimestamp(Tuple19<String, String, String, String, String,
                        String, Double, Long, Double, Long,
                        Long, Integer, Long, Double, Long,
                        Double, Double, Integer, Double> value)
                {
                    try
                    {
                        Timestamp ts = new Timestamp(sdf.parse(value.f3).getTime());
                        return ts.getTime();
                    } catch (Exception e)
                    {
                        throw new RuntimeException("Parsing Error");
                    }
                }
            });

第二步是计算的开始。我正在尝试对数据进行一些操作,为此我需要从 kafka 消息中获取数据,这基本上是我卡住的地方。

DataStream<String> largeDelta = kafkaData .keyBy(new KeySelector<Tuple19<String,String,String,String,String,
                String,Double,Long,Double,Long,
                Long,Integer,Long,Double,Long,
                Double,Double, Integer,Double>, String>()
                {
            public String getKey(Tuple19<String,String,String,String,String,
                    String,Double,Long,Double,Long,
                    Long,Integer,Long,Double,Long,
                    Double,Double, Integer,Double> value)
                {
                return value.f2;
                }
                })

                 .window(TumblingEventTimeWindows.of(Time.minutes(5)))
                 .process(new TrackChanges(5));
        largeDelta.writeAsText("/Alert.txt");
        env.execute("ABCD");

问题是我有一条错误消息告诉我“无法解析方法 'KeyBy(anonymous org.apache.flink.api.java.functions....'

非常欢迎任何帮助,因为我正在努力了解我所缺少的东西。

谢谢

我猜你的 new MapFunction()... 正在将传入的 String 转换为 Tuple2<String, String>,否则 KeySelector<Tuple2<String, String>, String> 将毫无意义。

如果是这样,那么您需要将 kafkaData.map(new MapFunction<... 的结果分配给 DataStream> 等等,然后将其与您的 keyBy.

尽管这么说,但我不明白您如何将 Tuple2<String, String>keyBy().window() 变成 DataStream<String> largeDelta。所以感觉像是多个问题。

顺便说一句,对于简单的键选择器,不要定义匿名函数,而是使用 lambda 表达式。例如。 kafkaData.keyBy(r -> r.f1) 会做的。