MapFunction 的实现是不可序列化的 Flink

The implementation of the MapFunction is not serializable Flink

我正在尝试实现一个 class,使用户能够在不受输入流类型限制的情况下操作 N 个输入流。

首先,我想将所有输入 DataStreams 转换为 keyedStreams。 因此,我将输入数据流映射到元组中,然后应用 KeyBy 将其转换为密钥流。

我总是遇到序列化问题,我尝试按照本指南进行操作 https://ci.apache.org/projects/flink/flink-docs-stable/dev/java_lambdas.html 但没有成功。

我想知道的是:

  1. Java 中的 Serialization/Deserialization 是什么?以及用途。
  2. 我可以在 Flink 中使用序列化解决哪些问题
  3. 我的代码有什么问题(你可能会在下面的代码和错误消息中找到)

非常感谢。

主要Class:

public class CEP {

private  Integer streamsIdComp = 0;
final  private Map<Integer, DataStream<?> > dataStreams = new HashMap<>();
final  private Map<Integer, TypeInformation<?>> dataStreamsTypes = new HashMap<>();

public <T> KeyedStream<Tuple2<Integer, T>, Integer> converttoKeyedStream(DataStream<T> inputStream){

    Preconditions.checkNotNull(inputStream, "dataStream");
    TypeInformation<T> streamType = inputStream.getType();

    KeyedStream<Tuple2<Integer,T>,Integer> keyedInputStream = inputStream.
            map(new MapFunction<T, Tuple2<Integer,T>>() {
                @Override
                public Tuple2<Integer, T> map(T value) throws Exception {
                    return Tuple2.of(streamsIdComp, value);
                }
            }).
            keyBy(new KeySelector<Tuple2<Integer, T>, Integer>() {
                @Override
                public Integer getKey(Tuple2<Integer, T> integerTTuple2) throws Exception {
                    return integerTTuple2.f0;
                }
            });
    return keyedInputStream;
}

public <T1> void addInputStream(DataStream<T1> inputStream) {

    TypeInformation<T1> streamType = inputStream.getType();

    dataStreamsTypes.put(streamsIdComp, streamType);
    dataStreams.put(streamsIdComp, this.converttoKeyedStream(inputStream));
    streamsIdComp++;
}
}

测试Class

public class CEPTest {

@Test
public void addInputStreamTest() throws Exception {
    //test if we can change keys in a keyedStream
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    DataStream<Record> input1 = env.fromElements(
            new Record("1", 1, "a"),
            new Record("2", 2, "b"),
            new Record("3", 3, "c"))
            .keyBy(Record::getBizName);

    DataStream<Integer> input2 = env.fromElements(1, 2, 3, 4);

    CEP cepObject = new CEP();
    cepObject.addInputStream(input1);
    cepObject.addInputStream(input2);

   }
}

错误信息

org.apache.flink.api.common.InvalidProgramException: The implementation of the MapFunction 
is not serializable. The implementation accesses fields of its enclosing class, which is a 
common reason for non-serializability. A common solution is to make the function a proper 
(non-inner) class, or a static inner class.

at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1821)
at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:188)
at org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:590)
at CEP.converttoKeyedStream(CEP.java:25)
at CEP.addInputStream(CEP.java:45)
at CEPTest.addInputStreamTest(CEPTest.java:33)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.junit.runners.model.FrameworkMethod.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access[=12=]0(ParentRunner.java:58)
at org.junit.runners.ParentRunner.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at 
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
Caused by: java.io.NotSerializableException: CEP
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1185)
at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553)
at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1510)
at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
at java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:349)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:586)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:133)
... 29 more

Flink 是一个分布式框架。这意味着,您的程序可能会在数千个节点上运行 运行。这也意味着每个工作节点都必须接收要与所需上下文一起执行的代码。稍微简化一下,流经系统的事件和要执行的函数都必须是可序列化的——因为它们是通过线路传输的。这就是为什么序列化在一般的分布式编程中很重要。


简而言之,序列化是将数据编码成字节表示的过程,可以在另一个节点(另一个JVM)上传输和恢复。


回到问题。这是你的理由:

Caused by: java.io.NotSerializableException: CEP

这是由行

引起的
return Tuple2.of(streamsIdComp, value);

您正在使用 streamsIdComp 变量,它是 CEP class 中的一个字段。这意味着,Flink 必须序列化整个 class 才能在执行 MapFunction 时访问该字段。您可以通过在 converttoKeyedStream 函数中引入局部变量来克服它:

public <T> KeyedStream<Tuple2<Integer, T>, Integer> converttoKeyedStream(DataStream<T> inputStream){

    Preconditions.checkNotNull(inputStream, "dataStream");
    TypeInformation<T> streamType = inputStream.getType();
    // note this variable is local
    int localStreamsIdComp = streamsIdComp;

    KeyedStream<Tuple2<Integer,T>,Integer> keyedInputStream = inputStream.
            map(new MapFunction<T, Tuple2<Integer,T>>() {
                @Override
                public Tuple2<Integer, T> map(T value) throws Exception {
                    // and is used here
                    return Tuple2.of(localStreamsIdComp, value);
                }
            }).
            keyBy(new KeySelector<Tuple2<Integer, T>, Integer>() {
                @Override
                public Integer getKey(Tuple2<Integer, T> integerTTuple2) throws Exception {
                    return integerTTuple2.f0;
                }
            });
    return keyedInputStream;
}

这样 Flink 就必须序列化这个单个变量,而不是整个 class 本身。