如何停止 Flume 的 HTTP 源
How to stop a Flume's HTTP source
我正在使用 Flume 1.4.0,我正在尝试以某种方式停止 Flume 的组件:
- 首先,停止来源。
- 然后等待通道内的所有事件都被接收器消耗。
- 消耗完所有事件后,停止通道和接收器。
上述任务由关闭挂钩执行,与 org.apache.flume.node.Application
中创建的挂钩相同(实际上,我正在开发自定义 Application
)。
我获取对源、通道和接收器的引用的方式是:
MaterializedConfiguration conf = configurationProvider.getConfiguration();
ImmutableMap<String, SourceRunner> sourcesRef = conf.getSourceRunners();
ImmutableMap<String, Channel> channelsRef = conf.getChannels();
ImmutableMap<String, SinkRunner> sinksRef = conf.getSinkRunners();
关键是我得到了这个 NullPointerException
:
2015-02-17 16:03:28,094 (agent-shutdown-hook) [ERROR - org.apache.flume.source.http.HTTPSource.stop(HTTPSource.java:169)] Error while stopping HTTPSource. Exception follows.
java.lang.NullPointerException
at org.apache.flume.source.http.HTTPSource.stop(HTTPSource.java:165)
at org.apache.flume.source.EventDrivenSourceRunner.stop(EventDrivenSourceRunner.java:51)
at es.tid.fiware.fiwareconnectors.cygnus.nodes.CygnusApplication$AgentShutdownHook.stopSources(CygnusApplication.java:296)
at es.tid.fiware.fiwareconnectors.cygnus.nodes.CygnusApplication$AgentShutdownHook.run(CygnusApplication.java:231)
HTTPSource.java:165
是关于停止实现源的Http服务器部分的Jetty服务器,也就是那个好像是null的:
162 @Override
163 public void stop() {
164 try {
165 srv.stop();
166 srv.join();
167 srv = null;
168 } catch (Exception ex) {
169 LOG.error("Error while stopping HTTPSource. Exception follows.", ex);
170 }
171 sourceCounter.stop();
172 LOG.info("Http source {} stopped. Metrics: {}", getName(), sourceCounter);
173 }
为什么为空?源工作正常,能够接收 Http 请求。
我猜这不是关闭 Flume 组件的正确方法...如果不是,那是什么?
谢谢!
原因是 srv 被多个线程共享(因此它是 volatile 声明)。 Flume 尝试调用 close 来终止源,并且这种情况发生了不止一次。第二次调用 stop() 失败,因为 srv 已经无效。
这种情况发生在你的情况而不是标准的普通 flume 代理中的原因可能是因为你没有更新 SourceCounter。查看 MonitoredCounterGroup 了解详细信息。
已修复。感谢 Erik 的指点,我调试了代码,直到我意识到每次调用 configurationProvider.getConfiguration()
语句时都会创建一个新的 MaterializedConfiguration
。这样的 物质化配置 是一整套 运行 源、通道和接收器。因此,我有几个相同来源的副本......哎呀!尽管如此,不知何故 Flume 足够聪明,可以检测到配置的多个 实现 ,我已经看到它关闭了所有重复的组件......但这包括 volatile Jetty 服务器的变量等等。
因此,与其这样做:
MaterializedConfiguration conf = configurationProvider.getConfiguration();
ImmutableMap<String, SourceRunner> sourcesRef = conf.getSourceRunners();
ImmutableMap<String, Channel> channelsRef = conf.getChannels();
ImmutableMap<String, SinkRunner> sinksRef = conf.getSinkRunners();
现在我在 handleConfigurationEvent(MaterializedConfiguration conf)
获得了我需要的参考资料(它被覆盖):
@Override
@Subscribe
public synchronized void handleConfigurationEvent(MaterializedConfiguration conf) {
sourcesRef = conf.getSourceRunners();
channelsRef = conf.getChannels();
sinksRef = conf.getSinkRunners();
super.handleConfigurationEvent(conf);
} // handleConfigurationEvent
再次感谢埃里克!
我正在使用 Flume 1.4.0,我正在尝试以某种方式停止 Flume 的组件:
- 首先,停止来源。
- 然后等待通道内的所有事件都被接收器消耗。
- 消耗完所有事件后,停止通道和接收器。
上述任务由关闭挂钩执行,与 org.apache.flume.node.Application
中创建的挂钩相同(实际上,我正在开发自定义 Application
)。
我获取对源、通道和接收器的引用的方式是:
MaterializedConfiguration conf = configurationProvider.getConfiguration();
ImmutableMap<String, SourceRunner> sourcesRef = conf.getSourceRunners();
ImmutableMap<String, Channel> channelsRef = conf.getChannels();
ImmutableMap<String, SinkRunner> sinksRef = conf.getSinkRunners();
关键是我得到了这个 NullPointerException
:
2015-02-17 16:03:28,094 (agent-shutdown-hook) [ERROR - org.apache.flume.source.http.HTTPSource.stop(HTTPSource.java:169)] Error while stopping HTTPSource. Exception follows.
java.lang.NullPointerException
at org.apache.flume.source.http.HTTPSource.stop(HTTPSource.java:165)
at org.apache.flume.source.EventDrivenSourceRunner.stop(EventDrivenSourceRunner.java:51)
at es.tid.fiware.fiwareconnectors.cygnus.nodes.CygnusApplication$AgentShutdownHook.stopSources(CygnusApplication.java:296)
at es.tid.fiware.fiwareconnectors.cygnus.nodes.CygnusApplication$AgentShutdownHook.run(CygnusApplication.java:231)
HTTPSource.java:165
是关于停止实现源的Http服务器部分的Jetty服务器,也就是那个好像是null的:
162 @Override
163 public void stop() {
164 try {
165 srv.stop();
166 srv.join();
167 srv = null;
168 } catch (Exception ex) {
169 LOG.error("Error while stopping HTTPSource. Exception follows.", ex);
170 }
171 sourceCounter.stop();
172 LOG.info("Http source {} stopped. Metrics: {}", getName(), sourceCounter);
173 }
为什么为空?源工作正常,能够接收 Http 请求。
我猜这不是关闭 Flume 组件的正确方法...如果不是,那是什么?
谢谢!
原因是 srv 被多个线程共享(因此它是 volatile 声明)。 Flume 尝试调用 close 来终止源,并且这种情况发生了不止一次。第二次调用 stop() 失败,因为 srv 已经无效。
这种情况发生在你的情况而不是标准的普通 flume 代理中的原因可能是因为你没有更新 SourceCounter。查看 MonitoredCounterGroup 了解详细信息。
已修复。感谢 Erik 的指点,我调试了代码,直到我意识到每次调用 configurationProvider.getConfiguration()
语句时都会创建一个新的 MaterializedConfiguration
。这样的 物质化配置 是一整套 运行 源、通道和接收器。因此,我有几个相同来源的副本......哎呀!尽管如此,不知何故 Flume 足够聪明,可以检测到配置的多个 实现 ,我已经看到它关闭了所有重复的组件......但这包括 volatile Jetty 服务器的变量等等。
因此,与其这样做:
MaterializedConfiguration conf = configurationProvider.getConfiguration();
ImmutableMap<String, SourceRunner> sourcesRef = conf.getSourceRunners();
ImmutableMap<String, Channel> channelsRef = conf.getChannels();
ImmutableMap<String, SinkRunner> sinksRef = conf.getSinkRunners();
现在我在 handleConfigurationEvent(MaterializedConfiguration conf)
获得了我需要的参考资料(它被覆盖):
@Override
@Subscribe
public synchronized void handleConfigurationEvent(MaterializedConfiguration conf) {
sourcesRef = conf.getSourceRunners();
channelsRef = conf.getChannels();
sinksRef = conf.getSinkRunners();
super.handleConfigurationEvent(conf);
} // handleConfigurationEvent
再次感谢埃里克!