Flink DataStream 有没有像 mapPartition 这样的 api?
Does Flink DataStream have api like mapPartition?
我想像这样在 stream.map()
中使用不可序列化的对象
stream.map { i =>
val obj = new SomeUnserializableClass()
obj.doSomething(i)
}
效率很低,因为我创建了很多SomeUnserializableClass
实例。实际上,它只能在每个 worker 中创建一次。
在 Spark 中,我可以使用 mapPartition
来执行此操作。但是在flink stream api中,我不知道。
如果您正在处理不可序列化的 class,我建议您创建一个 RichFunction。在你的例子中是 RichMapFunction.
Flink 中的 Rich operator 有一个 open
方法,它在任务管理器中仅作为初始化程序执行一次。
所以诀窍是让你的字段瞬态化并在你的 open 方法中实例化它。
检查下面的例子:
public class NonSerializableFieldMapFunction extends RichMapFunction {
transient SomeUnserializableClass someUnserializableClass;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.someUnserializableClass = new SomeUnserializableClass();
}
@Override
public Object map(Object o) throws Exception {
return someUnserializableClass.doSomething(o);
}
}
那么您的代码将如下所示:
stream.map(new NonSerializableFieldMapFunction())
P.D: 我正在使用 java 语法,请使其适应 scala。
我想像这样在 stream.map()
中使用不可序列化的对象
stream.map { i =>
val obj = new SomeUnserializableClass()
obj.doSomething(i)
}
效率很低,因为我创建了很多SomeUnserializableClass
实例。实际上,它只能在每个 worker 中创建一次。
在 Spark 中,我可以使用 mapPartition
来执行此操作。但是在flink stream api中,我不知道。
如果您正在处理不可序列化的 class,我建议您创建一个 RichFunction。在你的例子中是 RichMapFunction.
Flink 中的 Rich operator 有一个 open
方法,它在任务管理器中仅作为初始化程序执行一次。
所以诀窍是让你的字段瞬态化并在你的 open 方法中实例化它。
检查下面的例子:
public class NonSerializableFieldMapFunction extends RichMapFunction {
transient SomeUnserializableClass someUnserializableClass;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.someUnserializableClass = new SomeUnserializableClass();
}
@Override
public Object map(Object o) throws Exception {
return someUnserializableClass.doSomething(o);
}
}
那么您的代码将如下所示:
stream.map(new NonSerializableFieldMapFunction())
P.D: 我正在使用 java 语法,请使其适应 scala。