我可以在重复运行的计划任务中执行“阻塞订阅”吗?
Can I do `blocking-subscribe` in a scheduled task which runs repeatedly?
最近我发现我的 Clojure/Ring/Jetty 服务器在我 cider-connect
进入它并做某事时反复进入 NoClassDefFoundError
。我想那是因为线程池被一些死线程耗尽了。
然后我在服务器中找到了这个函数,它通过定义 clojurewerkz.quartzite.scheduler
job:
每天运行一次
(defn consumer-msgs-announcement
[rabbitmq queue-name & args]
(with-open [conn (lc/connect rabbitmq)]
(let [ch (lch/open conn)]
(lq/declare ch queue-name {:durable false :auto-delete false})
(println " [*] Waiting for messages. To exit press CTRL+C")
;; (lcons/blocking-subscribe ch queue-name msg-queue/post-wxmsg-handle-delivery {:auto-ack true})
(lcons/blocking-subscribe ch queue-name handle-delivery-announcement {:auto-ack true})
)))
其中包含的包定义为:
[langohr.core :as lc]
[langohr.channel :as lch]
[clojure.string :as str]
[langohr.queue :as lq]
[langohr.consumers :as lcons]
我怀疑 blocking-subscribe
会导致它曾经永远阻塞的线程耗尽 JVM 的线程池,最终导致 NoClassDefFoundError
错误。
我不太确定,但是我可以在重复运行的计划任务中执行blocking-subscribe
吗?
我上面描述的事情可能吗?还是我的推导有问题?
谢谢。
根据我的经验,blocking-subscribe
会无限期地阻塞线程。您可以快速检查 source 的 blocking-subscribe
以查看它是否进入了可能无穷无尽的惰性序列的 do-seq。
您似乎只想让您的作业收集自上次 运行 以来已在队列中备份的所有现有消息。对于这种访问模式,这里有几个选项对我来说很重要。
您可以使用 langohr.basic/get 来处理队列中的消息,直到收到 nil
。
您可以使用langohr.consumers/subscribe
创建一个简单的非阻塞消费者
- 启动消费者
- 保留您获得的消费者标签
- 睡眠预定时间
- 使用langohr取消消费者。basic/cancel
P.S。我无法 post link 到 subscribe/cancel
文档,但在源代码和 langohr API 参考中很容易找到。
最近我发现我的 Clojure/Ring/Jetty 服务器在我 cider-connect
进入它并做某事时反复进入 NoClassDefFoundError
。我想那是因为线程池被一些死线程耗尽了。
然后我在服务器中找到了这个函数,它通过定义 clojurewerkz.quartzite.scheduler
job:
(defn consumer-msgs-announcement
[rabbitmq queue-name & args]
(with-open [conn (lc/connect rabbitmq)]
(let [ch (lch/open conn)]
(lq/declare ch queue-name {:durable false :auto-delete false})
(println " [*] Waiting for messages. To exit press CTRL+C")
;; (lcons/blocking-subscribe ch queue-name msg-queue/post-wxmsg-handle-delivery {:auto-ack true})
(lcons/blocking-subscribe ch queue-name handle-delivery-announcement {:auto-ack true})
)))
其中包含的包定义为:
[langohr.core :as lc]
[langohr.channel :as lch]
[clojure.string :as str]
[langohr.queue :as lq]
[langohr.consumers :as lcons]
我怀疑 blocking-subscribe
会导致它曾经永远阻塞的线程耗尽 JVM 的线程池,最终导致 NoClassDefFoundError
错误。
我不太确定,但是我可以在重复运行的计划任务中执行blocking-subscribe
吗?
我上面描述的事情可能吗?还是我的推导有问题?
谢谢。
根据我的经验,blocking-subscribe
会无限期地阻塞线程。您可以快速检查 source 的 blocking-subscribe
以查看它是否进入了可能无穷无尽的惰性序列的 do-seq。
您似乎只想让您的作业收集自上次 运行 以来已在队列中备份的所有现有消息。对于这种访问模式,这里有几个选项对我来说很重要。
您可以使用 langohr.basic/get 来处理队列中的消息,直到收到
nil
。您可以使用
langohr.consumers/subscribe
创建一个简单的非阻塞消费者- 启动消费者
- 保留您获得的消费者标签
- 睡眠预定时间
- 使用langohr取消消费者。basic/cancel
P.S。我无法 post link 到 subscribe/cancel
文档,但在源代码和 langohr API 参考中很容易找到。