对象重用 - 改变相同的对象 - 在 Flink 运算符中

Object reuse - mutating same object - in Flink operators

我正在阅读文档 here,它给出了重用对象的用例,如下所示:

stream
    .apply(new WindowFunction<WikipediaEditEvent, Tuple2<String, Long>, String, TimeWindow>() {
        // Create an instance that we will reuse on every call
        private Tuple2<String, Long> result = new Tuple<>();
    
        @Override
        public void apply(String userName, TimeWindow timeWindow, Iterable<WikipediaEditEvent> iterable, Collector<Tuple2<String, Long>> collector) throws Exception {
            long changesCount = ...
            
            // Set fields on an existing object instead of creating a new one
            result.f0 = userName;
            // Auto-boxing!! A new Long value may be created
            result.f1 = changesCount;
            
            // Reuse the same Tuple2 object
            collector.collect(result);
        }
    }

所以每次都不是创建一个新的元组,它似乎能够通过使用它的可变性质来使用同一个元组,以减少 GC 的压力。它是否适用于所有运营商,我们可以通过 collector.collect(...) 调用在管道中改变和传递相同的对象?

通过使用这个想法,我想知道我可以在哪些地方进行这样的优化而不会破坏代码或引入偷偷摸摸的错误。再举一个例子,KeySelector returns 一个元组取自下面给出的 答案:

KeyedStream<Employee, Tuple2<String, String>> employeesKeyedByCountryndEmployer = 
  streamEmployee.keyBy(
    new KeySelector<Employee, Tuple2<String, String>>() {

      @Override
      public Tuple2<String, String> getKey(Employee value) throws Exception {
        return Tuple2.of(value.getCountry(), value.getEmployer());
      }
    }
  );

我想知道在这种情况下,我是否可以通过使用不同的输入对其进行变异来重用同一个元组,如下所示。当然,在所有情况下,我都假设并行度大于 1,在实际用例中可能更高。

KeyedStream<Employee, Tuple2<String, String>> employeesKeyedByCountryndEmployer = 
  streamEmployee.keyBy(
    new KeySelector<Employee, Tuple2<String, String>>() {
      
      Tuple2<String, String> tuple = new Tuple2<>();

      @Override
      public Tuple2<String, String> getKey(Employee value) throws Exception {
        tuple.f0 = value.getCountry();
        tuple.f1 = value.value.getEmployer();
        return tuple;
      }
    }
  );

我不知道,如果 Flink 在管道的阶段之间复制对象,所以我想知道做这样的优化是否安全。我在文档中阅读了 enableObjectReuse() 配置,但我不确定我是否真的理解它。实际上,它可能有点 Flink 内部结构,虽然无法理解 Flink 什么时候在管道中管理 data/object/records。也许我应该先说清楚?

谢谢,

查看 Dave Anderson 对

的回答

基本上你不记得跨函数调用的输入对象引用或 修改输入对象。因此,在上述 KeySelector 情况下,您修改的是您创建的对象,而不是输入对象。

这是一种 KeySelector 中的重用 安全。 keyBy 不是运算符,运算符链中有关对象重用的通常规则(我已介绍 )不适用。