Flink 使用 .keyBy 和 KeySelector 从 Kafka 获取数据
Flink use of .keyBy with KeySelector taking data from Kafka
我在 java 中有 Flink 脚本和 Kafka 连接器。我从 Kafka 获取数据没有问题,第一步我正在做一个 .map 从消息中获取时间戳。为了使用事件时间 windows,我从数据中提取了以毫秒为单位的时间戳,并将其 return 用于 flink。为此,我使用了“assignTimestampsAndWatermarks”
DataStream<String> kafkaData = env.addSource(new FlinkKafkaConsumer("CorID_0", new SimpleStringSchema(), p));
kafkaData.map(new MapFunction<
String, Tuple19<String, String, String, String, String,
String, Double, Long, Double, Long,
Long, Integer, Long, Double, Long,
Double, Double, Integer, Double>>()
{
public Tuple19<String, String, String, String, String,
String, Double, Long, Double, Long,
Long, Integer, Long, Double, Long,
Double, Double, Integer, Double> map(String value)
{
String[] words = value.split(",");
return new Tuple19<String, String, String, String, String,
String, Double, Long, Double, Long,
Long, Integer, Long, Double, Long,
Double, Double, Integer, Double>
(words[0], words[1], words[2], words[3], words[4], words[5], Double.parseDouble(words[6]),
Long.parseLong(words[7]), Double.parseDouble(words[8]), Long.parseLong(words[9]),
Long.parseLong(words[10]), Integer.parseInt(words[11]),
Long.parseLong(words[12]), Double.parseDouble(words[13]),
Long.parseLong(words[14]), Double.parseDouble(words[15]),
Double.parseDouble(words[16]), Integer.parseInt(words[17]),
Double.parseDouble(words[18]));
}
})
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple19<String, String, String, String, String,
String, Double, Long, Double, Long,
Long, Integer, Long, Double, Long,
Double, Double, Integer, Double>>()
{
private final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
public long extractAscendingTimestamp(Tuple19<String, String, String, String, String,
String, Double, Long, Double, Long,
Long, Integer, Long, Double, Long,
Double, Double, Integer, Double> value)
{
try
{
Timestamp ts = new Timestamp(sdf.parse(value.f3).getTime());
return ts.getTime();
} catch (Exception e)
{
throw new RuntimeException("Parsing Error");
}
}
});
第二步是计算的开始。我正在尝试对数据进行一些操作,为此我需要从 kafka 消息中获取数据,这基本上是我卡住的地方。
DataStream<String> largeDelta = kafkaData .keyBy(new KeySelector<Tuple19<String,String,String,String,String,
String,Double,Long,Double,Long,
Long,Integer,Long,Double,Long,
Double,Double, Integer,Double>, String>()
{
public String getKey(Tuple19<String,String,String,String,String,
String,Double,Long,Double,Long,
Long,Integer,Long,Double,Long,
Double,Double, Integer,Double> value)
{
return value.f2;
}
})
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.process(new TrackChanges(5));
largeDelta.writeAsText("/Alert.txt");
env.execute("ABCD");
问题是我有一条错误消息告诉我“无法解析方法 'KeyBy(anonymous org.apache.flink.api.java.functions....'
非常欢迎任何帮助,因为我正在努力了解我所缺少的东西。
谢谢
我猜你的 new MapFunction()...
正在将传入的 String
转换为 Tuple2<String, String>
,否则 KeySelector<Tuple2<String, String>, String>
将毫无意义。
如果是这样,那么您需要将 kafkaData.map(new MapFunction<...
的结果分配给 DataStream> 等等,然后将其与您的 keyBy
.
尽管这么说,但我不明白您如何将 Tuple2<String, String>
的 keyBy().window()
变成 DataStream<String> largeDelta
。所以感觉像是多个问题。
顺便说一句,对于简单的键选择器,不要定义匿名函数,而是使用 lambda 表达式。例如。 kafkaData.keyBy(r -> r.f1)
会做的。
我在 java 中有 Flink 脚本和 Kafka 连接器。我从 Kafka 获取数据没有问题,第一步我正在做一个 .map 从消息中获取时间戳。为了使用事件时间 windows,我从数据中提取了以毫秒为单位的时间戳,并将其 return 用于 flink。为此,我使用了“assignTimestampsAndWatermarks”
DataStream<String> kafkaData = env.addSource(new FlinkKafkaConsumer("CorID_0", new SimpleStringSchema(), p));
kafkaData.map(new MapFunction<
String, Tuple19<String, String, String, String, String,
String, Double, Long, Double, Long,
Long, Integer, Long, Double, Long,
Double, Double, Integer, Double>>()
{
public Tuple19<String, String, String, String, String,
String, Double, Long, Double, Long,
Long, Integer, Long, Double, Long,
Double, Double, Integer, Double> map(String value)
{
String[] words = value.split(",");
return new Tuple19<String, String, String, String, String,
String, Double, Long, Double, Long,
Long, Integer, Long, Double, Long,
Double, Double, Integer, Double>
(words[0], words[1], words[2], words[3], words[4], words[5], Double.parseDouble(words[6]),
Long.parseLong(words[7]), Double.parseDouble(words[8]), Long.parseLong(words[9]),
Long.parseLong(words[10]), Integer.parseInt(words[11]),
Long.parseLong(words[12]), Double.parseDouble(words[13]),
Long.parseLong(words[14]), Double.parseDouble(words[15]),
Double.parseDouble(words[16]), Integer.parseInt(words[17]),
Double.parseDouble(words[18]));
}
})
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple19<String, String, String, String, String,
String, Double, Long, Double, Long,
Long, Integer, Long, Double, Long,
Double, Double, Integer, Double>>()
{
private final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
public long extractAscendingTimestamp(Tuple19<String, String, String, String, String,
String, Double, Long, Double, Long,
Long, Integer, Long, Double, Long,
Double, Double, Integer, Double> value)
{
try
{
Timestamp ts = new Timestamp(sdf.parse(value.f3).getTime());
return ts.getTime();
} catch (Exception e)
{
throw new RuntimeException("Parsing Error");
}
}
});
第二步是计算的开始。我正在尝试对数据进行一些操作,为此我需要从 kafka 消息中获取数据,这基本上是我卡住的地方。
DataStream<String> largeDelta = kafkaData .keyBy(new KeySelector<Tuple19<String,String,String,String,String,
String,Double,Long,Double,Long,
Long,Integer,Long,Double,Long,
Double,Double, Integer,Double>, String>()
{
public String getKey(Tuple19<String,String,String,String,String,
String,Double,Long,Double,Long,
Long,Integer,Long,Double,Long,
Double,Double, Integer,Double> value)
{
return value.f2;
}
})
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.process(new TrackChanges(5));
largeDelta.writeAsText("/Alert.txt");
env.execute("ABCD");
问题是我有一条错误消息告诉我“无法解析方法 'KeyBy(anonymous org.apache.flink.api.java.functions....'
非常欢迎任何帮助,因为我正在努力了解我所缺少的东西。
谢谢
我猜你的 new MapFunction()...
正在将传入的 String
转换为 Tuple2<String, String>
,否则 KeySelector<Tuple2<String, String>, String>
将毫无意义。
如果是这样,那么您需要将 kafkaData.map(new MapFunction<...
的结果分配给 DataStreamkeyBy
.
尽管这么说,但我不明白您如何将 Tuple2<String, String>
的 keyBy().window()
变成 DataStream<String> largeDelta
。所以感觉像是多个问题。
顺便说一句,对于简单的键选择器,不要定义匿名函数,而是使用 lambda 表达式。例如。 kafkaData.keyBy(r -> r.f1)
会做的。