如何在 Flink 流中缓存进程级别的局部变量?
How to cache the local variable at process level in Flink streaming?
在Flink任务实例中,当事件来临时我需要访问远程网络服务来获取一些数据,但是我不想每次事件来临时都访问远程网络服务,所以我需要将数据缓存在本地内存可以被进程的所有任务访问,怎么办?将数据存储在 class 级别的静态私有变量中 ?
例如下面的例子,如果在class Splitter设置局部变量localCache,它缓存在操作员级别而不是进程级别。
public class WindowWordCount {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String, Integer>> dataStream = env
.socketTextStream("localhost", 9999)
.flatMap(new Splitter())
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1);
dataStream.print();
env.execute("Window WordCount");
}
public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
***private object localCache ;***
@Override
public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
for (String word: sentence.split(" ")) {
out.collect(new Tuple2<String, Integer>(word, 1));
}
}
}
}
正如你所说。您将在 RichFlatMapFunction
中使用静态变量并在 open
中对其进行初始化。 open
将在输入任何记录之前在每个 TaskManager 上调用。请注意,每个不同的插槽都会创建一个 Splitter 实例,因此在大多数情况下,一个 TaskManager 上会有多个 Splitter 实例。因此,您需要防止重复创建。
public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
private transient Object localCache;
@Override
public void open(Configuration parameters) throws Exception {
if (localCache == null)
localCache = ... ;
}
@Override
public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
for (String word: sentence.split(" ")) {
out.collect(new Tuple2<String, Integer>(word, 1));
}
}
}
可扩展的方法可能使用 Source 运算符实际执行对 Web 服务的调用,然后将结果写入流。然后,您可以将该流作为广播流访问到您的操作员,从而导致发送到广播流的一个对象(网络调用结果)被发送到接收操作员的每个实例。这将在集群中的所有机器和 JVM 之间共享该单个 Web 调用的结果。您还可以保留广播状态并在集群扩展时与您的操作员的新实例共享它。
在Flink任务实例中,当事件来临时我需要访问远程网络服务来获取一些数据,但是我不想每次事件来临时都访问远程网络服务,所以我需要将数据缓存在本地内存可以被进程的所有任务访问,怎么办?将数据存储在 class 级别的静态私有变量中 ?
例如下面的例子,如果在class Splitter设置局部变量localCache,它缓存在操作员级别而不是进程级别。
public class WindowWordCount {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String, Integer>> dataStream = env
.socketTextStream("localhost", 9999)
.flatMap(new Splitter())
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1);
dataStream.print();
env.execute("Window WordCount");
}
public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
***private object localCache ;***
@Override
public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
for (String word: sentence.split(" ")) {
out.collect(new Tuple2<String, Integer>(word, 1));
}
}
}
}
正如你所说。您将在 RichFlatMapFunction
中使用静态变量并在 open
中对其进行初始化。 open
将在输入任何记录之前在每个 TaskManager 上调用。请注意,每个不同的插槽都会创建一个 Splitter 实例,因此在大多数情况下,一个 TaskManager 上会有多个 Splitter 实例。因此,您需要防止重复创建。
public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
private transient Object localCache;
@Override
public void open(Configuration parameters) throws Exception {
if (localCache == null)
localCache = ... ;
}
@Override
public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
for (String word: sentence.split(" ")) {
out.collect(new Tuple2<String, Integer>(word, 1));
}
}
}
可扩展的方法可能使用 Source 运算符实际执行对 Web 服务的调用,然后将结果写入流。然后,您可以将该流作为广播流访问到您的操作员,从而导致发送到广播流的一个对象(网络调用结果)被发送到接收操作员的每个实例。这将在集群中的所有机器和 JVM 之间共享该单个 Web 调用的结果。您还可以保留广播状态并在集群扩展时与您的操作员的新实例共享它。