我可以在重复运行的计划任务中执行“阻塞订阅”吗?

Can I do `blocking-subscribe` in a scheduled task which runs repeatedly?

最近我发现我的 Clojure/Ring/Jetty 服务器在我 cider-connect 进入它并做某事时反复进入 NoClassDefFoundError。我想那是因为线程池被一些死线程耗尽了。

然后我在服务器中找到了这个函数,它通过定义 clojurewerkz.quartzite.scheduler job:

每天运行一次
(defn consumer-msgs-announcement
  [rabbitmq queue-name & args]
  (with-open [conn (lc/connect rabbitmq)]
    (let [ch  (lch/open conn)]
      (lq/declare ch queue-name {:durable false :auto-delete false})
      (println " [*] Waiting for messages. To exit press CTRL+C")
      ;; (lcons/blocking-subscribe ch queue-name msg-queue/post-wxmsg-handle-delivery {:auto-ack true})
      (lcons/blocking-subscribe ch queue-name handle-delivery-announcement {:auto-ack true})
      )))

其中包含的包定义为:

   [langohr.core :as lc]
   [langohr.channel :as lch]
   [clojure.string :as str]
   [langohr.queue :as lq]
   [langohr.consumers :as lcons]

我怀疑 blocking-subscribe 会导致它曾经永远阻塞的线程耗尽 JVM 的线程池,最终导致 NoClassDefFoundError 错误。

我不太确定,但是我可以在重复运行的计划任务中执行blocking-subscribe吗?

我上面描述的事情可能吗?还是我的推导有问题?

谢谢。

根据我的经验,blocking-subscribe 会无限期地阻塞线程。您可以快速检查 sourceblocking-subscribe 以查看它是否进入了可能无穷无尽的惰性序列的 do-seq。

您似乎只想让您的作业收集自上次 运行 以来已在队列中备份的所有现有消息。对于这种访问模式,这里有几个选项对我来说很重要。

  1. 您可以使用 langohr.basic/get 来处理队列中的消息,直到收到 nil

  2. 您可以使用langohr.consumers/subscribe创建一个简单的非阻塞消费者

    • 启动消费者
    • 保留您获得的消费者标签
    • 睡眠预定时间
    • 使用langohr取消消费者。basic/cancel

P.S。我无法 post link 到 subscribe/cancel 文档,但在源代码和 langohr API 参考中很容易找到。