Ruby 线程和 Websocket

Ruby Threads and Websockets

这已经困扰我一段时间了——这似乎是完全可行的事情,但我被卡住了。

我有一个小的 Ruby 程序,它只是充当中间人。它运行很长时间(几分钟),阻塞操作(通过 FFI 接口),但应该通过 DDP 连接通过回调将它从该操作获得的定期更新发送到主 Meteor 应用程序。

这个程序的两个组件都是独立工作的。通过我自己开发的系统以及 metybur gem,我能够与 Meteor 应用程序进行通信。而且,如果我只是使用 puts 从 FFI 接口的回调中输出数据,我也能完美地获得这些数据。 (除此之外,由于另一个原因我不能完全确定,如果 FFI/blocking 动作在 Thread.new 块中,它会默默地失败。)

但是,出于某种原因,当我尝试将数据发送到 Meteor 应用程序时,没有任何反应。 ws.send(在 EventMachine 上)returns true,尽管实际上从未被调用,即使我将它放在自己的 Thread.new 块中。

我的一部分人怀疑(虽然无法弄清楚如何测试它)连接丢失是因为 Ruby 应用程序无法在阻塞期间处理 ping/pong keepalive 请求。

我已经尝试 EM.spawn 从 EventMachine 进行阻塞进程,我已经尝试在它自己的线程中启动 EventMachine,但似乎没有任何效果。

想知道是否有类似这样的最佳实践,即使在 CPU 密集阻塞操作期间也能保持应用程序的 EventMachine 部分响应?

已编辑

经过我们在评论中的讨论,我决定查看我发布的代码并编写一个利用 Iodine's Websocket 客户端(我更喜欢,因为我是作者)的小型 DDP 封装。

我不得不承认,我在思考这个问题时真的很开心。 附件是使用 Iodine 的 Meteor 连接器的简化代码。这是真正基本的,仅包括:断开连接时更新连接、完成握手和应答乒乓球。

要将此代码与第一个答案中启动的 FFI 工作流概念一起使用,请使用:

# create the connection to the Meteor server
# and setup the callback for incoming messages:
meteor_ddp = IodineDDP.new('ws://chat.n-k.de/websocket') do |message|
    Iodine.debug "got message #{message}, it's a Hash"
end

# next, create a dedicated thread for the FFI,
# it will run until the FFI had finished
# or the application exits.
Thread.new do
   # initialize FFI interface
   data = StringIO.new "initialize FFI interface - StringIO will be our data for now"
   # imagine it takes time
   sleep 1
   # Meteor will respond to these with error messages 
   (meteor_ddp << data.read(3)) && sleep(0.2) until data.eof?
   sleep 1
   Iodine.signal_exit
end

# it seems Meteor sends pings and since we already answer them in the
# class object, it should be enough...
# but we can do it too, if we want to:
Iodine.run_every(5) { meteor_ddp << {msg: :ping}.to_json }

关于 Meteor DDP 连接 class,这可能是这样实现的:

require 'iodine/client'

class IodineDDP
   attr_reader :session
   attr_reader :server_id   
   def initialize url, &block
      @url = url
      @ddp_initialized = false
      @session = nil
      @server_id = nil
      @block = block
      @closed = false
      connect_websocket
   end

   def << message
      Iodine.debug "Writing message #{message}"
      ensure_connection
      @ws << message
   end
   alias :write :<<

   def close
      @closed = true
      @ws.on_close { nil }
      @ws.close
   end

   protected

   def on_message data
      # make sure the DDP handshake is complete
      return handshake data unless @ddp_initialized
      data = JSON.parse(data)
      Iodine.debug "Got message: #{data}"
      return write({msg: 'pong', id: data['id'] }.to_json) if data['msg'] == 'ping'
      return true if data['msg'] == 'pong'
      @block.call data
   end
   def on_close
      @ddp_initialized = false
      connect_websocket
   end

   def ensure_connection
      return true unless @ws.closed? || !@ddp_initialized
      raise 'This DDP instance was shutdown using `close`, it will not be renewed' if @closed
      raise 'DDP disconnected - not enough threads to ensure reconnection' if (@ws.closed? || !@ddp_initialized) && Iodine.threads == 1
      timeout = Iodine.time + 3
      sleep 0.2 until @ddp_initialized && Iodine.time <= timeout
      raise 'DDP disconnected - reconnection timed-out.' if @ws.closed? || !@ddp_initialized
   end

   def connect_websocket
      @___on_message_proc ||= method(:on_message)
      @___on_close_proc ||= method(:on_close)
      @ws = ::Iodine::Http::WebsocketClient.connect(@url, on_message: @___on_message_proc, on_open: @___on_open_proc, on_close: @___on_close_proc)
      # inform
      Iodine.debug "initiating a new DDP connection to #{@url}"
      # start the DDP handshake
      handshake
   end
   def handshake last_message = nil
      raise 'Handshake failed because the websocket was closed or missing' if @ws.nil? || @ws.closed?
      unless last_message # this is the first message sent
         Iodine.debug "Meteor DDP handshake initiated."
         msg = {msg: "connect", version: "1", support: ["1"]}
         msg[:session] = @session if @session
         return(@ws << msg.to_json)
      end
      message = JSON.parse(last_message)
      raise "Meteor DDP connection error, requires version #{message['version']}: #{last_message}" if message['msg'] == 'failed'
      if message['msg'] == 'connected'
         # inform
         Iodine.debug "Meteor DDP handshake complete."
         @session = message['session']
         return @ddp_initialized = true
      else
         return @server_id = message['server_id'] if message['server_id'] 
         Iodine.error "Invalid handshake data - closing connection."
         close
      end      
   end
end

# we need at least two threads for the IodineDDP#ensure_connection
Iodine.threads = 3

# # if we are inside a larger application, call: 
# Iodine.force_start! 

# # if we are on irb:
exit
# no need to write anything if this is the whole of the script