Ruby NATS简单发布

Ruby NATS simple publishing

我正在 Ruby 学习 NATS,想从标准输入创建一个简单的 "cat"。要么我得到所有的行并且循环不退出,要么什么都没有发送。

!/usr/bin/env ruby

require "nats/client"

queue = ARGV.shift

NATS.start do
  STDIN.each_line do |line|
    puts "will send: #{line}"
    NATS.publish(queue, line)
  end

  NATS.stop
end

puts "... #{__LINE__}"
exit 0

我 运行 与:

# cat cat.rb | ./cat.rb myqueue

启用 NATS.stop 后,队列中不会显示任何内容,但它会正确显示它将尝试发送的每一行,并且程序会退出:

# cat cat.rb | ./cat.rb myqueue
will send: #!/usr/bin/env ruby
will send: 
will send: require "nats/client"
will send: 
will send: queue = ARGV.shift
will send: 
will send: NATS.start do
will send:   STDIN.each_line do |line|
will send:     puts "will send: #{line}"
will send:     NATS.publish(queue, line)
will send:   end
will send: 
will send:   NATS.stop
will send: end
will send: 
will send: puts "... #{__LINE__}"
will send: exit 0
will send: 
... 16
#

并且NATS.stop被注释掉,所有行都被发送到队列,但程序没有退出:

# cat cat.rb | ./cat.rb myqueue
will send: #!/usr/bin/env ruby
will send: 
will send: require "nats/client"
will send: 
will send: queue = ARGV.shift
will send: 
will send: NATS.start do
will send:   STDIN.each_line do |line|
will send:     puts "will send: #{line}"
will send:     NATS.publish(queue, line)
will send:   end
will send: 
will send:   # NATS.stop
will send: end
will send: 
will send: puts "... #{__LINE__}"
will send: exit 0
will send: 
<and program sits here>

我错过了什么?从 Ruby 发送一条不涉及多线程问题的消息不是更简单吗?

查看库 NATS.stop 的代码似乎非常激进,因为它会立即断开连接,而不检查是否还有一些处理要做。

因此,在 NATS.stop 之前调用 NATS.flush 似乎是解决方案,因为它应该确保在关闭连接之前处理所有未决事件:

NATS.start do
  ...do your messaging...

  NATS.flush  # Ensure processing of pending messages    
  NATS.stop   # Immidiate exit
end

对于您的示例,您可以尝试执行类似的操作,以便在停止事件循环之前发布所有行并刷新到服务器。

require 'nats/client'

$stdout.sync = true

NATS.start(servers: ["nats://127.0.0.1:4222"]) do |nats|
  STDIN.read.each_line do |line|
    nats.publish("hello", line)
  end
  nats.flush do
    nats.close
  end
end