如何重新处理 ruby 中抛出异常的线程中的项目?

How to reprocess items in threads throwing exceptions in ruby?

我正在使用线程和队列处理项目,但有时会为特定项目抛出异常,因此它不会得到处理。

我的想法是将有问题的项目放回队列中,以便可以再次处理它并继续将其放回队列中,直到不再抛出异常为止。

尽管我打算重新处理项目,但此代码只处理队列一次:

#assume this queue is immediately filled with items
item_queue = Queue.new 

define_method (:processItem) {|item|
  begin
    #do something with item
  #Bad style below: will work in specific exception handling later
  rescue Exception => ex 
    #something happened, so put it back in the queue
    item_queue << item
    return
  end
  #more processing here, if 'begin' was successful
}

threads = []    

until item_queue.empty?
  threads << Thread.new{ processItem(item_queue.pop) }
end

threads.each{|thread| thread.join}

我的想法是 Queue 是线程安全的,所以它可以像这样使用 - 但结果显示并非如此。

如何确保所有产生异常的项目都得到重新处理,直到所有项目都成功?

是的 Queue 是线程安全的,但您使用它的方式并不安全。

item_queue.empty? 可能 return true 线程完成之前。

until item_queue.empty? 中调用 Thread.join 将解决竞争条件问题,但最终会导致程序一次处理队列中的一项。

until item_queue.empty?
  Thread.new{ processItem(item_queue.pop) }.join
end

如果您希望队列中的项目以多线程方式处理,那么您需要预定义您想要的线程数,例如:

# three threads processing items in the queue
until item_queue.empty?
  t1 = Thread.new{ processItem(item_queue.pop) }
  t2 = Thread.new{ processItem(item_queue.pop) }
  t3 = Thread.new{ processItem(item_queue.pop) }
  t1.join 1
  t2.join 1
  t3.join 1
end