Flink DataStream 有没有像 mapPartition 这样的 api?

Does Flink DataStream have api like mapPartition?

我想像这样在 stream.map() 中使用不可序列化的对象

stream.map { i =>
  val obj = new SomeUnserializableClass()
  obj.doSomething(i)
}

效率很低,因为我创建了很多SomeUnserializableClass实例。实际上,它只能在每个 worker 中创建一次。

在 Spark 中,我可以使用 mapPartition 来执行此操作。但是在flink stream api中,我不知道。

如果您正在处理不可序列化的 class,我建议您创建一个 RichFunction。在你的例子中是 RichMapFunction.

Flink 中的 Rich operator 有一个 open 方法,它在任务管理器中仅作为初始化程序执行一次。

所以诀窍是让你的字段瞬态化并在你的 open 方法中实例化它。

检查下面的例子:

public class NonSerializableFieldMapFunction extends RichMapFunction {

    transient SomeUnserializableClass someUnserializableClass;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        this.someUnserializableClass = new SomeUnserializableClass();
    }

    @Override
    public Object map(Object o) throws Exception {
        return someUnserializableClass.doSomething(o);
    }
}

那么您的代码将如下所示:

stream.map(new NonSerializableFieldMapFunction())

P.D: 我正在使用 java 语法,请使其适应 scala。