如何在不使用 Play Framework 中的全局对象的情况下在应用程序启动时仅启动一次 Actor?
How to start an Actor only once at the Application Startup time without using Global object in Play Framework?
我一直听说 Global 对象将被弃用,将一些代码放在 Global class 的 .onStart() 中并不是一个好习惯。如果是这样,什么是替代方案?
一群人 here 说要使用全局对象,但这也是 4 年前的事了。
我目前有以下需求。我编写了一个正在运行的应用程序,它使用消息队列中的消息(在我的例子中,这是 NSQ(目前没有 Camel 绑定到它))并将这些消息发送到正在收听它的 Web Socket 客户端组。所以我希望消费者为每个主题初始化一次并永远启动 运行ning 直到应用程序本身终止。发生的事情是,当我向一个主题发送 10 条消息时,我只能看到 5 条消息通过,但是当我 运行 从下面的代码中使用完全相同的方法 "startNSQConsumer()" 在一个单独的 Plain old Java 应用程序本身我可以看到所有 10 条消息。很明显这与游戏有关,我不确定发生了什么。有什么想法吗?
public class Global extends GlobalSettings {
private static final ExecutorService EXECUTOR_SERVICE = Executors.newSingleThreadScheduledExecutor();
@Override
public void onStart(Application application) {
EXECUTOR_SERVICE.submit(() -> {
startNSQConsumer();
});
super.onStart(application);
}
@Override
public void onStop(Application application) {
EXECUTOR_SERVICE.shutdown();
super.onStop(application);
}
private void startNSQConsumer() {
NSQLookup lookup = new DefaultNSQLookup();
Configuration config = Play.application().configuration();
List<String> topics = config.getStringList("TOPICS");
String nsqlookupdIP = config.getString("NSQLOOKUPD_IP");
int nsqlookupdPort = Integer.parseInt(config.getString("NSQLOOKUPD_PORT"));
lookup.addLookupAddress(nsqlookupdIP, nsqlookupdPort);
Logger.info("Using nsqlookup at " + nsqlookupdIP + ":" + nsqlookupdPort);
topics.forEach((topic) -> {
NSQConsumer consumer = new NSQConsumer(lookup, topic, topic + "_channel", (message) -> { // messages here will come from external system running on some other node
String receivedMessage = new String(message.getMessage());
//Logger.info(receivedMessage); // When I enable or disable this I only see 50% of the messages.
EventBusManager.getEventBus().publish(new MsgEnvelope(topic, receivedMessage)); // broadcast message to all Actors listening on a particular topic
message.finished();
}, new NSQConfig(), new NSQErrorCallback() {
@Override
public void error(NSQException x) {
Logger.error("Cause: " + x.getCause() + ", Message: " + x.getMessage());
}
});
consumer.start();
});
}
}
你有两个问题,我会先回答,然后你可以将你的代码重构为 运行 而没有 Global
,看看问题是否仍然存在。如果是这样,请创建另一个问题。
日程安排
在 Playframework 中,执行计划任务的最佳方式是 Akka actor 和 Akka scheduler:https://www.playframework.com/documentation/2.5.x/JavaAkka。
您需要将您的代码(没有 EXECUTOR_SERVICE
,只是有效的部分)放入 actor 中:
@Singleton
public class NSQRunner extends UntypedActor{
public static final String name = "NSQRunner";
...
@Override
public void onReceive(Object msg) throws Throwable {
if(msg instanceof NSQRunnerProtocol.StartConsumer){
startNSQConsumer();
} else {
unhandled(msg);
}
}
private void startNSQConsumer() {
...
}
}
您可以将此演员视为 Executors.newSingleThreadScheduledExecutor()
的一个主题。它会在收到告诉他这样做的消息时完成工作 - NSQRunnerProtocol.StartConsumer
。这是协议 class:
public class NSQRunnerProtocol {
/**
* Start the NSQ consumer.
*
*/
public static class StartConsumer{}
}
现在您需要在应用程序启动时 运行 它。
这里是调度程序任务,它将在启动后 1 秒后 运行 你的 actor:
@Singleton
public class Scheduler {
@Inject
public Scheduler(
final ActorSystem system,
final Configuration configuration,
@Named(NSQRunner.name) ActorRef nsqRunner) {
system.scheduler().scheduleOnce(
Duration.create(1, TimeUnit.SECONDS),
nsqRunner,
new NSQRunnerProtocol.StartConsumer(),
system.dispatcher(),
null);
}
}
现在您需要在应用程序中注册演员和调度程序。 Play框架有模块系统,所以你需要创建一个模块class:
package my.nsq
...
public class Module extends AbstractModule implements AkkaGuiceSupport {
@Override
protected void configure() {
// Bind actors.
bindActor(NSQRunner.class, NSQRunner.name);
// Bind scheduler, it will run on the start of application.
bind(Scheduler.class).asEagerSingleton();
}
}
并在conf/application.conf
中注册:
play.modules {
...
enabled += my.nsq.Module
}
停止
在这种情况下,如果您需要某些东西来运行 onStop
应用程序,那么您需要通过 addStopHook
方法注入 ApplicationLifecycle
和一个停止挂钩:https://www.playframework.com/documentation/2.5.x/PluginsToModules#Create-a-Module-class .
下面是一个数据库访问单例的例子:
@Singleton
public class Couchbase {
private static ALogger logger = Logger.of(Couchbase.class);
private final Cluster cluster;
private final Bucket bucket;
@Inject
public Couchbase(ApplicationLifecycle lifecycle, Configuration configuration) {
// Read the database configuration from the setting.
final String clusterName = configuration.getString("couchbase.cluster");
final String bucketName = configuration.getString("couchbase.bucket.name");
final String bucketPassword = configuration.getString("couchbase.bucket.password");
final CouchbaseEnvironment env = DefaultCouchbaseEnvironment
.builder()
.queryEndpoints(1)
.autoreleaseAfter(50000)
.queryTimeout(20000)
.retryStrategy(FailFastRetryStrategy.INSTANCE)
.reconnectDelay(Delay.fixed(5, TimeUnit.SECONDS))
.build();
logger.debug(
"Connect to the couche: cluster \"{}\", bucket: \"{}\"",
clusterName,
bucketName);
// Connect.
cluster = CouchbaseCluster.create(env, clusterName);
bucket = cluster.openBucket(bucketName, bucketPassword);
// Create a default index.
bucket.bucketManager().createN1qlPrimaryIndex(true, false);
// Special operations on the application shutdown.
lifecycle.addStopHook(() -> {
// Close the connection on application shutdown.
env.shutdown();
cluster.disconnect();
// Couchbase close the connection synchronously
// so we need just return an empty CompletableFuture.
return CompletableFuture.completedFuture(null);
});
}
/**
* @return the application bucket
*/
public Bucket getBucket() {
return bucket;
}
}
您可以在 Module
中绑定此对象:
bind(Couchbase.class).asEagerSingleton();
然后你可以注入它,甚至在你的演员中:
public class NSQRunner ...
@Inject
public NSQRunner(Couchbase couchbase){
...
}
}
你可以像 Couchbase
那样在单身人士中尽你所能,但我认为演员要好得多。
我一直听说 Global 对象将被弃用,将一些代码放在 Global class 的 .onStart() 中并不是一个好习惯。如果是这样,什么是替代方案?
一群人 here 说要使用全局对象,但这也是 4 年前的事了。
我目前有以下需求。我编写了一个正在运行的应用程序,它使用消息队列中的消息(在我的例子中,这是 NSQ(目前没有 Camel 绑定到它))并将这些消息发送到正在收听它的 Web Socket 客户端组。所以我希望消费者为每个主题初始化一次并永远启动 运行ning 直到应用程序本身终止。发生的事情是,当我向一个主题发送 10 条消息时,我只能看到 5 条消息通过,但是当我 运行 从下面的代码中使用完全相同的方法 "startNSQConsumer()" 在一个单独的 Plain old Java 应用程序本身我可以看到所有 10 条消息。很明显这与游戏有关,我不确定发生了什么。有什么想法吗?
public class Global extends GlobalSettings {
private static final ExecutorService EXECUTOR_SERVICE = Executors.newSingleThreadScheduledExecutor();
@Override
public void onStart(Application application) {
EXECUTOR_SERVICE.submit(() -> {
startNSQConsumer();
});
super.onStart(application);
}
@Override
public void onStop(Application application) {
EXECUTOR_SERVICE.shutdown();
super.onStop(application);
}
private void startNSQConsumer() {
NSQLookup lookup = new DefaultNSQLookup();
Configuration config = Play.application().configuration();
List<String> topics = config.getStringList("TOPICS");
String nsqlookupdIP = config.getString("NSQLOOKUPD_IP");
int nsqlookupdPort = Integer.parseInt(config.getString("NSQLOOKUPD_PORT"));
lookup.addLookupAddress(nsqlookupdIP, nsqlookupdPort);
Logger.info("Using nsqlookup at " + nsqlookupdIP + ":" + nsqlookupdPort);
topics.forEach((topic) -> {
NSQConsumer consumer = new NSQConsumer(lookup, topic, topic + "_channel", (message) -> { // messages here will come from external system running on some other node
String receivedMessage = new String(message.getMessage());
//Logger.info(receivedMessage); // When I enable or disable this I only see 50% of the messages.
EventBusManager.getEventBus().publish(new MsgEnvelope(topic, receivedMessage)); // broadcast message to all Actors listening on a particular topic
message.finished();
}, new NSQConfig(), new NSQErrorCallback() {
@Override
public void error(NSQException x) {
Logger.error("Cause: " + x.getCause() + ", Message: " + x.getMessage());
}
});
consumer.start();
});
}
}
你有两个问题,我会先回答,然后你可以将你的代码重构为 运行 而没有 Global
,看看问题是否仍然存在。如果是这样,请创建另一个问题。
日程安排
在 Playframework 中,执行计划任务的最佳方式是 Akka actor 和 Akka scheduler:https://www.playframework.com/documentation/2.5.x/JavaAkka。
您需要将您的代码(没有 EXECUTOR_SERVICE
,只是有效的部分)放入 actor 中:
@Singleton
public class NSQRunner extends UntypedActor{
public static final String name = "NSQRunner";
...
@Override
public void onReceive(Object msg) throws Throwable {
if(msg instanceof NSQRunnerProtocol.StartConsumer){
startNSQConsumer();
} else {
unhandled(msg);
}
}
private void startNSQConsumer() {
...
}
}
您可以将此演员视为 Executors.newSingleThreadScheduledExecutor()
的一个主题。它会在收到告诉他这样做的消息时完成工作 - NSQRunnerProtocol.StartConsumer
。这是协议 class:
public class NSQRunnerProtocol {
/**
* Start the NSQ consumer.
*
*/
public static class StartConsumer{}
}
现在您需要在应用程序启动时 运行 它。
这里是调度程序任务,它将在启动后 1 秒后 运行 你的 actor:
@Singleton
public class Scheduler {
@Inject
public Scheduler(
final ActorSystem system,
final Configuration configuration,
@Named(NSQRunner.name) ActorRef nsqRunner) {
system.scheduler().scheduleOnce(
Duration.create(1, TimeUnit.SECONDS),
nsqRunner,
new NSQRunnerProtocol.StartConsumer(),
system.dispatcher(),
null);
}
}
现在您需要在应用程序中注册演员和调度程序。 Play框架有模块系统,所以你需要创建一个模块class:
package my.nsq
...
public class Module extends AbstractModule implements AkkaGuiceSupport {
@Override
protected void configure() {
// Bind actors.
bindActor(NSQRunner.class, NSQRunner.name);
// Bind scheduler, it will run on the start of application.
bind(Scheduler.class).asEagerSingleton();
}
}
并在conf/application.conf
中注册:
play.modules {
...
enabled += my.nsq.Module
}
停止
在这种情况下,如果您需要某些东西来运行 onStop
应用程序,那么您需要通过 addStopHook
方法注入 ApplicationLifecycle
和一个停止挂钩:https://www.playframework.com/documentation/2.5.x/PluginsToModules#Create-a-Module-class .
下面是一个数据库访问单例的例子:
@Singleton
public class Couchbase {
private static ALogger logger = Logger.of(Couchbase.class);
private final Cluster cluster;
private final Bucket bucket;
@Inject
public Couchbase(ApplicationLifecycle lifecycle, Configuration configuration) {
// Read the database configuration from the setting.
final String clusterName = configuration.getString("couchbase.cluster");
final String bucketName = configuration.getString("couchbase.bucket.name");
final String bucketPassword = configuration.getString("couchbase.bucket.password");
final CouchbaseEnvironment env = DefaultCouchbaseEnvironment
.builder()
.queryEndpoints(1)
.autoreleaseAfter(50000)
.queryTimeout(20000)
.retryStrategy(FailFastRetryStrategy.INSTANCE)
.reconnectDelay(Delay.fixed(5, TimeUnit.SECONDS))
.build();
logger.debug(
"Connect to the couche: cluster \"{}\", bucket: \"{}\"",
clusterName,
bucketName);
// Connect.
cluster = CouchbaseCluster.create(env, clusterName);
bucket = cluster.openBucket(bucketName, bucketPassword);
// Create a default index.
bucket.bucketManager().createN1qlPrimaryIndex(true, false);
// Special operations on the application shutdown.
lifecycle.addStopHook(() -> {
// Close the connection on application shutdown.
env.shutdown();
cluster.disconnect();
// Couchbase close the connection synchronously
// so we need just return an empty CompletableFuture.
return CompletableFuture.completedFuture(null);
});
}
/**
* @return the application bucket
*/
public Bucket getBucket() {
return bucket;
}
}
您可以在 Module
中绑定此对象:
bind(Couchbase.class).asEagerSingleton();
然后你可以注入它,甚至在你的演员中:
public class NSQRunner ...
@Inject
public NSQRunner(Couchbase couchbase){
...
}
}
你可以像 Couchbase
那样在单身人士中尽你所能,但我认为演员要好得多。