Ruby 线程和 Websocket
Ruby Threads and Websockets
这已经困扰我一段时间了——这似乎是完全可行的事情,但我被卡住了。
我有一个小的 Ruby 程序,它只是充当中间人。它运行很长时间(几分钟),阻塞操作(通过 FFI 接口),但应该通过 DDP 连接通过回调将它从该操作获得的定期更新发送到主 Meteor 应用程序。
这个程序的两个组件都是独立工作的。通过我自己开发的系统以及 metybur gem,我能够与 Meteor 应用程序进行通信。而且,如果我只是使用 puts
从 FFI 接口的回调中输出数据,我也能完美地获得这些数据。 (除此之外,由于另一个原因我不能完全确定,如果 FFI/blocking 动作在 Thread.new
块中,它会默默地失败。)
但是,出于某种原因,当我尝试将数据发送到 Meteor 应用程序时,没有任何反应。 ws.send
(在 EventMachine 上)returns true
,尽管实际上从未被调用,即使我将它放在自己的 Thread.new
块中。
我的一部分人怀疑(虽然无法弄清楚如何测试它)连接丢失是因为 Ruby 应用程序无法在阻塞期间处理 ping/pong keepalive 请求。
我已经尝试 EM.spawn
从 EventMachine 进行阻塞进程,我已经尝试在它自己的线程中启动 EventMachine,但似乎没有任何效果。
想知道是否有类似这样的最佳实践,即使在 CPU 密集阻塞操作期间也能保持应用程序的 EventMachine 部分响应?
已编辑
经过我们在评论中的讨论,我决定查看我发布的代码并编写一个利用 Iodine's Websocket 客户端(我更喜欢,因为我是作者)的小型 DDP 封装。
我不得不承认,我在思考这个问题时真的很开心。
附件是使用 Iodine 的 Meteor 连接器的简化代码。这是真正基本的,仅包括:断开连接时更新连接、完成握手和应答乒乓球。
要将此代码与第一个答案中启动的 FFI 工作流概念一起使用,请使用:
# create the connection to the Meteor server
# and setup the callback for incoming messages:
meteor_ddp = IodineDDP.new('ws://chat.n-k.de/websocket') do |message|
Iodine.debug "got message #{message}, it's a Hash"
end
# next, create a dedicated thread for the FFI,
# it will run until the FFI had finished
# or the application exits.
Thread.new do
# initialize FFI interface
data = StringIO.new "initialize FFI interface - StringIO will be our data for now"
# imagine it takes time
sleep 1
# Meteor will respond to these with error messages
(meteor_ddp << data.read(3)) && sleep(0.2) until data.eof?
sleep 1
Iodine.signal_exit
end
# it seems Meteor sends pings and since we already answer them in the
# class object, it should be enough...
# but we can do it too, if we want to:
Iodine.run_every(5) { meteor_ddp << {msg: :ping}.to_json }
关于 Meteor DDP 连接 class,这可能是这样实现的:
require 'iodine/client'
class IodineDDP
attr_reader :session
attr_reader :server_id
def initialize url, &block
@url = url
@ddp_initialized = false
@session = nil
@server_id = nil
@block = block
@closed = false
connect_websocket
end
def << message
Iodine.debug "Writing message #{message}"
ensure_connection
@ws << message
end
alias :write :<<
def close
@closed = true
@ws.on_close { nil }
@ws.close
end
protected
def on_message data
# make sure the DDP handshake is complete
return handshake data unless @ddp_initialized
data = JSON.parse(data)
Iodine.debug "Got message: #{data}"
return write({msg: 'pong', id: data['id'] }.to_json) if data['msg'] == 'ping'
return true if data['msg'] == 'pong'
@block.call data
end
def on_close
@ddp_initialized = false
connect_websocket
end
def ensure_connection
return true unless @ws.closed? || !@ddp_initialized
raise 'This DDP instance was shutdown using `close`, it will not be renewed' if @closed
raise 'DDP disconnected - not enough threads to ensure reconnection' if (@ws.closed? || !@ddp_initialized) && Iodine.threads == 1
timeout = Iodine.time + 3
sleep 0.2 until @ddp_initialized && Iodine.time <= timeout
raise 'DDP disconnected - reconnection timed-out.' if @ws.closed? || !@ddp_initialized
end
def connect_websocket
@___on_message_proc ||= method(:on_message)
@___on_close_proc ||= method(:on_close)
@ws = ::Iodine::Http::WebsocketClient.connect(@url, on_message: @___on_message_proc, on_open: @___on_open_proc, on_close: @___on_close_proc)
# inform
Iodine.debug "initiating a new DDP connection to #{@url}"
# start the DDP handshake
handshake
end
def handshake last_message = nil
raise 'Handshake failed because the websocket was closed or missing' if @ws.nil? || @ws.closed?
unless last_message # this is the first message sent
Iodine.debug "Meteor DDP handshake initiated."
msg = {msg: "connect", version: "1", support: ["1"]}
msg[:session] = @session if @session
return(@ws << msg.to_json)
end
message = JSON.parse(last_message)
raise "Meteor DDP connection error, requires version #{message['version']}: #{last_message}" if message['msg'] == 'failed'
if message['msg'] == 'connected'
# inform
Iodine.debug "Meteor DDP handshake complete."
@session = message['session']
return @ddp_initialized = true
else
return @server_id = message['server_id'] if message['server_id']
Iodine.error "Invalid handshake data - closing connection."
close
end
end
end
# we need at least two threads for the IodineDDP#ensure_connection
Iodine.threads = 3
# # if we are inside a larger application, call:
# Iodine.force_start!
# # if we are on irb:
exit
# no need to write anything if this is the whole of the script
这已经困扰我一段时间了——这似乎是完全可行的事情,但我被卡住了。
我有一个小的 Ruby 程序,它只是充当中间人。它运行很长时间(几分钟),阻塞操作(通过 FFI 接口),但应该通过 DDP 连接通过回调将它从该操作获得的定期更新发送到主 Meteor 应用程序。
这个程序的两个组件都是独立工作的。通过我自己开发的系统以及 metybur gem,我能够与 Meteor 应用程序进行通信。而且,如果我只是使用 puts
从 FFI 接口的回调中输出数据,我也能完美地获得这些数据。 (除此之外,由于另一个原因我不能完全确定,如果 FFI/blocking 动作在 Thread.new
块中,它会默默地失败。)
但是,出于某种原因,当我尝试将数据发送到 Meteor 应用程序时,没有任何反应。 ws.send
(在 EventMachine 上)returns true
,尽管实际上从未被调用,即使我将它放在自己的 Thread.new
块中。
我的一部分人怀疑(虽然无法弄清楚如何测试它)连接丢失是因为 Ruby 应用程序无法在阻塞期间处理 ping/pong keepalive 请求。
我已经尝试 EM.spawn
从 EventMachine 进行阻塞进程,我已经尝试在它自己的线程中启动 EventMachine,但似乎没有任何效果。
想知道是否有类似这样的最佳实践,即使在 CPU 密集阻塞操作期间也能保持应用程序的 EventMachine 部分响应?
已编辑
经过我们在评论中的讨论,我决定查看我发布的代码并编写一个利用 Iodine's Websocket 客户端(我更喜欢,因为我是作者)的小型 DDP 封装。
我不得不承认,我在思考这个问题时真的很开心。 附件是使用 Iodine 的 Meteor 连接器的简化代码。这是真正基本的,仅包括:断开连接时更新连接、完成握手和应答乒乓球。
要将此代码与第一个答案中启动的 FFI 工作流概念一起使用,请使用:
# create the connection to the Meteor server
# and setup the callback for incoming messages:
meteor_ddp = IodineDDP.new('ws://chat.n-k.de/websocket') do |message|
Iodine.debug "got message #{message}, it's a Hash"
end
# next, create a dedicated thread for the FFI,
# it will run until the FFI had finished
# or the application exits.
Thread.new do
# initialize FFI interface
data = StringIO.new "initialize FFI interface - StringIO will be our data for now"
# imagine it takes time
sleep 1
# Meteor will respond to these with error messages
(meteor_ddp << data.read(3)) && sleep(0.2) until data.eof?
sleep 1
Iodine.signal_exit
end
# it seems Meteor sends pings and since we already answer them in the
# class object, it should be enough...
# but we can do it too, if we want to:
Iodine.run_every(5) { meteor_ddp << {msg: :ping}.to_json }
关于 Meteor DDP 连接 class,这可能是这样实现的:
require 'iodine/client'
class IodineDDP
attr_reader :session
attr_reader :server_id
def initialize url, &block
@url = url
@ddp_initialized = false
@session = nil
@server_id = nil
@block = block
@closed = false
connect_websocket
end
def << message
Iodine.debug "Writing message #{message}"
ensure_connection
@ws << message
end
alias :write :<<
def close
@closed = true
@ws.on_close { nil }
@ws.close
end
protected
def on_message data
# make sure the DDP handshake is complete
return handshake data unless @ddp_initialized
data = JSON.parse(data)
Iodine.debug "Got message: #{data}"
return write({msg: 'pong', id: data['id'] }.to_json) if data['msg'] == 'ping'
return true if data['msg'] == 'pong'
@block.call data
end
def on_close
@ddp_initialized = false
connect_websocket
end
def ensure_connection
return true unless @ws.closed? || !@ddp_initialized
raise 'This DDP instance was shutdown using `close`, it will not be renewed' if @closed
raise 'DDP disconnected - not enough threads to ensure reconnection' if (@ws.closed? || !@ddp_initialized) && Iodine.threads == 1
timeout = Iodine.time + 3
sleep 0.2 until @ddp_initialized && Iodine.time <= timeout
raise 'DDP disconnected - reconnection timed-out.' if @ws.closed? || !@ddp_initialized
end
def connect_websocket
@___on_message_proc ||= method(:on_message)
@___on_close_proc ||= method(:on_close)
@ws = ::Iodine::Http::WebsocketClient.connect(@url, on_message: @___on_message_proc, on_open: @___on_open_proc, on_close: @___on_close_proc)
# inform
Iodine.debug "initiating a new DDP connection to #{@url}"
# start the DDP handshake
handshake
end
def handshake last_message = nil
raise 'Handshake failed because the websocket was closed or missing' if @ws.nil? || @ws.closed?
unless last_message # this is the first message sent
Iodine.debug "Meteor DDP handshake initiated."
msg = {msg: "connect", version: "1", support: ["1"]}
msg[:session] = @session if @session
return(@ws << msg.to_json)
end
message = JSON.parse(last_message)
raise "Meteor DDP connection error, requires version #{message['version']}: #{last_message}" if message['msg'] == 'failed'
if message['msg'] == 'connected'
# inform
Iodine.debug "Meteor DDP handshake complete."
@session = message['session']
return @ddp_initialized = true
else
return @server_id = message['server_id'] if message['server_id']
Iodine.error "Invalid handshake data - closing connection."
close
end
end
end
# we need at least two threads for the IodineDDP#ensure_connection
Iodine.threads = 3
# # if we are inside a larger application, call:
# Iodine.force_start!
# # if we are on irb:
exit
# no need to write anything if this is the whole of the script