如何理解 Apache Flink 中的 Window 机制
How to understand Window mechanism in Apache Flink
正在学习使用Flink处理流式数据
根据我的理解,我可以使用函数map
多次进行各种变换
说Data Source一直在往Flink发送String。所有String都是JSON格式的数据如下:
{"name":"titi","age":18}
{"name":"toto","age":20}
...
这是我的代码:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
FlinkPravegaReader<String> source = FlinkPravegaReader.<String>builder()
.withPravegaConfig(pravegaConfig)
.forStream(stream)
.withDeserializationSchema(new PravegaDeserializationSchema<>(String.class, new JavaSerializer<>()))
.build();
// Convert String to Json Object
// MyJson is a POJO class, defined by me
DataStream<MyJson> jsonStream = env.addSource(source).name("Pravega Stream")
.map(new MapFunction<String, MyJson>() {
@Override
public MyJson map(String s) throws Exception {
MyJson myJson = JSON.parseObject(s, MyJson.class);
return myJson;
}
});
// Convert MyJson Object to String and extract what I need
DataStream<String> valueInJson = jsonStream
.map(new MapFunction<MyJson, String>() {
@Override
public String map(MyJson myJson) throws Exception {
return myJson.getName().toString();
}
});
valueInJson.print();
env.execute("StreamingJob");
如您所见,我的示例非常简单:
获取并反序列化数据 ---> 将字符串转换为 Json 对象 ---> 将 Json 对象转换为字符串并获取我需要的内容(我这里只需要 name
)。
目前看来一切正常。我确实从日志文件中得到了预期的输出。
不过,我知道Flink为我们提供了一个强大的功能:Window.
我想知道如何在我的例子中使用这个机制。
例如,如果我想用 2 秒 windows 拆分数据流,如何编码?
我试过这样:
DataStream<String> valueInJson = jsonStream
.timeWindow(Time.seconds(2))
.map(new MapFunction<MyJson, String>() {
@Override
public String map(MyJson myJson) throws Exception {
return myJson.toString();
}
});
valueInJson.print();
但是,我得到一个错误:
cannot find symbol
symbol: method
timeWindow(org.apache.flink.streaming.api.windowing.time.Time)
location: variable jsonStream of type
org.apache.flink.streaming.api.datastream.DataStream
但是,我导入了:
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.windowing.time.Time;
为什么我会收到这个错误?我用错了 Windows 吗?我是否对 Flink 有所了解?
你有错误,因为 timeWindow()
函数是在 KeyedStream
中定义的,而不是在 DataStream
中定义的,因为它是基于键的操作。在您的情况下,将 timeWindow()
更改为 timeWindowAll()
.
就足够了
正在学习使用Flink处理流式数据
根据我的理解,我可以使用函数map
多次进行各种变换
说Data Source一直在往Flink发送String。所有String都是JSON格式的数据如下:
{"name":"titi","age":18}
{"name":"toto","age":20}
...
这是我的代码:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
FlinkPravegaReader<String> source = FlinkPravegaReader.<String>builder()
.withPravegaConfig(pravegaConfig)
.forStream(stream)
.withDeserializationSchema(new PravegaDeserializationSchema<>(String.class, new JavaSerializer<>()))
.build();
// Convert String to Json Object
// MyJson is a POJO class, defined by me
DataStream<MyJson> jsonStream = env.addSource(source).name("Pravega Stream")
.map(new MapFunction<String, MyJson>() {
@Override
public MyJson map(String s) throws Exception {
MyJson myJson = JSON.parseObject(s, MyJson.class);
return myJson;
}
});
// Convert MyJson Object to String and extract what I need
DataStream<String> valueInJson = jsonStream
.map(new MapFunction<MyJson, String>() {
@Override
public String map(MyJson myJson) throws Exception {
return myJson.getName().toString();
}
});
valueInJson.print();
env.execute("StreamingJob");
如您所见,我的示例非常简单:
获取并反序列化数据 ---> 将字符串转换为 Json 对象 ---> 将 Json 对象转换为字符串并获取我需要的内容(我这里只需要 name
)。
目前看来一切正常。我确实从日志文件中得到了预期的输出。
不过,我知道Flink为我们提供了一个强大的功能:Window.
我想知道如何在我的例子中使用这个机制。
例如,如果我想用 2 秒 windows 拆分数据流,如何编码?
我试过这样:
DataStream<String> valueInJson = jsonStream
.timeWindow(Time.seconds(2))
.map(new MapFunction<MyJson, String>() {
@Override
public String map(MyJson myJson) throws Exception {
return myJson.toString();
}
});
valueInJson.print();
但是,我得到一个错误:
cannot find symbol
symbol: method
timeWindow(org.apache.flink.streaming.api.windowing.time.Time)
location: variable jsonStream of type org.apache.flink.streaming.api.datastream.DataStream
但是,我导入了:
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.windowing.time.Time;
为什么我会收到这个错误?我用错了 Windows 吗?我是否对 Flink 有所了解?
你有错误,因为 timeWindow()
函数是在 KeyedStream
中定义的,而不是在 DataStream
中定义的,因为它是基于键的操作。在您的情况下,将 timeWindow()
更改为 timeWindowAll()
.