Apache Flink:如何计算 DataStream 中的事件总数
Apache Flink: How to count the total number of events in a DataStream
我有两个原始流,我正在加入这些流,然后我想计算已加入的事件总数和未加入的事件总数。我通过在 joinedEventDataStream
上使用地图来做到这一点,如下所示
joinedEventDataStream.map(new RichMapFunction<JoinedEvent, Object>() {
@Override
public Object map(JoinedEvent joinedEvent) throws Exception {
number_of_joined_events += 1;
return null;
}
});
问题 # 1:这是计算流中事件数量的合适方法吗?
问题 # 2:我注意到了一种有线行为,有些人可能不相信。问题是,当我 运行 我在 IntelliJ IDE 中的 Flink 程序时,它显示 number_of_joined_events
的正确值,但 0
在我将此程序提交为 [= 的情况下14=]。所以当我 运行 程序作为 jar
文件而不是实际计数时,我得到 number_of_joined_events
的初始值。为什么这种情况只发生在 jar
文件提交的情况下而不是在 IDE 中?
您的方法无效。您在通过 JAR 文件执行程序时注意到的行为是预期的。
我不知道 number_of_joined_events
是如何定义的,但我假设它是您程序中的一个静态变量。当您 运行 您 IDE 中的程序时,它 运行 位于单个 JVM 中。因此,所有操作员都可以访问静态变量。当您将 JAR 文件提交到远程进程时,该程序将在不同的 JVM(可能是多个 JVM)中执行,并且您的客户端进程中的静态变量永远不会更新。
您可以使用 Flink 的指标或 ReduceFunction
总和 1
来计算已处理记录的数量。
我有两个原始流,我正在加入这些流,然后我想计算已加入的事件总数和未加入的事件总数。我通过在 joinedEventDataStream
上使用地图来做到这一点,如下所示
joinedEventDataStream.map(new RichMapFunction<JoinedEvent, Object>() {
@Override
public Object map(JoinedEvent joinedEvent) throws Exception {
number_of_joined_events += 1;
return null;
}
});
问题 # 1:这是计算流中事件数量的合适方法吗?
问题 # 2:我注意到了一种有线行为,有些人可能不相信。问题是,当我 运行 我在 IntelliJ IDE 中的 Flink 程序时,它显示 number_of_joined_events
的正确值,但 0
在我将此程序提交为 [= 的情况下14=]。所以当我 运行 程序作为 jar
文件而不是实际计数时,我得到 number_of_joined_events
的初始值。为什么这种情况只发生在 jar
文件提交的情况下而不是在 IDE 中?
您的方法无效。您在通过 JAR 文件执行程序时注意到的行为是预期的。
我不知道 number_of_joined_events
是如何定义的,但我假设它是您程序中的一个静态变量。当您 运行 您 IDE 中的程序时,它 运行 位于单个 JVM 中。因此,所有操作员都可以访问静态变量。当您将 JAR 文件提交到远程进程时,该程序将在不同的 JVM(可能是多个 JVM)中执行,并且您的客户端进程中的静态变量永远不会更新。
您可以使用 Flink 的指标或 ReduceFunction
总和 1
来计算已处理记录的数量。