连接到 STOMP 队列的订阅者不会被添加到循环分配中
Subscribers connecting to a STOMP queue don't get added to round-robin distribution
我在 RabbitMQ 之上使用 perl 的 Net::Stomp
来构建一个非常简单的跨平台分布式工作队列。这个想法是将一堆作业提交到一个队列名称,然后让许多工作人员完成作业并执行它们。
如果我启动 "workers" 并将 frames/jobs 提交到队列,我得到的正是我所期望的:工作人员以循环方式从队列中弹出帧。
如果我将作业提交到持久队列,然后启动多个 worker,只有第一个将作业从队列中拉出。
# submit.pl
use Net::Stomp;
use JSON::PP;
my $stomp = Net::Stomp->new( { hostname => 'localhost', port => '61613' } );
$stomp->connect( { login => 'a', passcode => 'a' } ) or die;
for (my $i=0; $i < 20; $i++) {
my $package = encode_json( { seed => $i } );
$stomp->send( { destination => '/queue/runs', body => $package } );
}
$stomp->disconnect;
# worker.pl
my $stomp = Net::Stomp->new( { hostname => 'localhost', port => '61613' } );
$stomp->connect( { login => 'a', passcode => 'a' } );
$stomp->subscribe({
destination => '/queue/runs',
ack => 'client',
'activemq.prefetchSize' => 1 });
while (my $frame = $stomp->receive_frame) {
next unless defined $frame;
$stomp->ack( { frame => $frame } );
next unless $frame->body;
my $spec = decode_json $frame->body;
say $$ . " " . $spec->{seed};
sleep 1;
}
运行这个:
$ perl submit.pl && perl worker.pl & perl worker.pl
我希望看到的是这样的:
30026 0
30024 1
30026 2
30024 3
...
相反,我只看到第一个 "worker" 从队列中拉出帧。如果我终止第一个 "worker" 进程,第二个进程就会开始拉帧。
30024 0
30024 1
30024 2
30024 3
...
我希望这样的情况是,当工作人员订阅队列时,他们会立即开始以循环方式拉取帧。我宁愿不必编写明确的东西来处理这个问题。我假设协议中已经存在某种我忽略的机制,或者可能是 Net::Stomp
?
中的错误
正如我所说,如果工作人员在 submit.pl
之前 运行ning 是 运行。
,则循环调度将完美运行
在 Rabbitmq 的 STOMP 插件上,Net::Stomp
文档中显示的 activemq.prefetchSize
键没有任何作用。我所要做的就是将其更改为 'prefetch-count' => 1
,然后一切如我所料。
我在 RabbitMQ 之上使用 perl 的 Net::Stomp
来构建一个非常简单的跨平台分布式工作队列。这个想法是将一堆作业提交到一个队列名称,然后让许多工作人员完成作业并执行它们。
如果我启动 "workers" 并将 frames/jobs 提交到队列,我得到的正是我所期望的:工作人员以循环方式从队列中弹出帧。
如果我将作业提交到持久队列,然后启动多个 worker,只有第一个将作业从队列中拉出。
# submit.pl
use Net::Stomp;
use JSON::PP;
my $stomp = Net::Stomp->new( { hostname => 'localhost', port => '61613' } );
$stomp->connect( { login => 'a', passcode => 'a' } ) or die;
for (my $i=0; $i < 20; $i++) {
my $package = encode_json( { seed => $i } );
$stomp->send( { destination => '/queue/runs', body => $package } );
}
$stomp->disconnect;
# worker.pl
my $stomp = Net::Stomp->new( { hostname => 'localhost', port => '61613' } );
$stomp->connect( { login => 'a', passcode => 'a' } );
$stomp->subscribe({
destination => '/queue/runs',
ack => 'client',
'activemq.prefetchSize' => 1 });
while (my $frame = $stomp->receive_frame) {
next unless defined $frame;
$stomp->ack( { frame => $frame } );
next unless $frame->body;
my $spec = decode_json $frame->body;
say $$ . " " . $spec->{seed};
sleep 1;
}
运行这个:
$ perl submit.pl && perl worker.pl & perl worker.pl
我希望看到的是这样的:
30026 0
30024 1
30026 2
30024 3
...
相反,我只看到第一个 "worker" 从队列中拉出帧。如果我终止第一个 "worker" 进程,第二个进程就会开始拉帧。
30024 0
30024 1
30024 2
30024 3
...
我希望这样的情况是,当工作人员订阅队列时,他们会立即开始以循环方式拉取帧。我宁愿不必编写明确的东西来处理这个问题。我假设协议中已经存在某种我忽略的机制,或者可能是 Net::Stomp
?
正如我所说,如果工作人员在 submit.pl
之前 运行ning 是 运行。
在 Rabbitmq 的 STOMP 插件上,Net::Stomp
文档中显示的 activemq.prefetchSize
键没有任何作用。我所要做的就是将其更改为 'prefetch-count' => 1
,然后一切如我所料。