Flink 复杂拓扑(多输入)的集成测试
Integration test for complex topology (multiple inputs) in Flink
我需要为 flink 流拓扑编写单元测试。它基本上是一个 CoFlatMapFunction
,并且有 2 个输入。
我试图从这个页面中获得一些灵感:https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html
输入的顺序对我的拓扑很重要,所以当我测试时,我不能对每个输入使用 StreamExecutionEnvironment#fromCollection
,因为我无法控制每个输入中注入数据点的顺序输入。
我尝试使用 StreamExecutionEnvironment#fromCollection
创建单个输入并根据它们的类型将每个元素分派到我的 CoFlatMapFunction
的实际输入,但元素的顺序在此操作中丢失.
还有其他方法可以写这个测试吗?
您想使用 TwoInputStreamOperatorTestHarness
class。不幸的是文档有点稀疏。我有一个使用此 class 的测试,但尚未推送到 133_stream-test-harness branch of flink-crawler。
flink训练练习中有使用TwoInputStreamOperatorTestHarness的例子可以参考:
您将需要这些依赖项:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils-junit</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<version>1.10.19</version>
<type>jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime_2.11</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
您应该记住,这不是 public 受支持的界面,因此它可能会以意想不到的方式发展。
我需要为 flink 流拓扑编写单元测试。它基本上是一个 CoFlatMapFunction
,并且有 2 个输入。
我试图从这个页面中获得一些灵感:https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html
输入的顺序对我的拓扑很重要,所以当我测试时,我不能对每个输入使用 StreamExecutionEnvironment#fromCollection
,因为我无法控制每个输入中注入数据点的顺序输入。
我尝试使用 StreamExecutionEnvironment#fromCollection
创建单个输入并根据它们的类型将每个元素分派到我的 CoFlatMapFunction
的实际输入,但元素的顺序在此操作中丢失.
还有其他方法可以写这个测试吗?
您想使用 TwoInputStreamOperatorTestHarness
class。不幸的是文档有点稀疏。我有一个使用此 class 的测试,但尚未推送到 133_stream-test-harness branch of flink-crawler。
flink训练练习中有使用TwoInputStreamOperatorTestHarness的例子可以参考:
您将需要这些依赖项:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils-junit</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<version>1.10.19</version>
<type>jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime_2.11</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
您应该记住,这不是 public 受支持的界面,因此它可能会以意想不到的方式发展。