运行 并发线程中的加密提要:线程中没有当前事件循环 'ThreadPoolExecutor-0_0'

run crypto feed in a concurrent thread: There is no current event loop in thread 'ThreadPoolExecutor-0_0'

我正在尝试使用加密提要同时下载数据。

f = FeedHandler()
f.add_feed(Gateio(channels=[TRADES], symbols=list_tmp, callbacks={ TRADES: TradePostgresGateio(**postgres_cfg)}))
f.run()

上面这段代码可以运行成功。但是,我试图在后台 运行 它。所以我正在使用并发期货来提供帮助。

executor = concurrent.futures.ThreadPoolExecutor(16)
job2 = executor.submit(f.run)

但是,我得到了错误:

job2.result()


---------------------------------------------------------------------------
RuntimeError                              Traceback (most recent call last)
<ipython-input-54-f96e35ee3c66> in <module>
----> 1 job2.result()

~/anaconda3/lib/python3.8/concurrent/futures/_base.py in result(self, timeout)
    430                 raise CancelledError()
    431             elif self._state == FINISHED:
--> 432                 return self.__get_result()
    433 
    434             self._condition.wait(timeout)

~/anaconda3/lib/python3.8/concurrent/futures/_base.py in __get_result(self)
    386     def __get_result(self):
    387         if self._exception:
--> 388             raise self._exception
    389         else:
    390             return self._result

~/anaconda3/lib/python3.8/concurrent/futures/thread.py in run(self)
     55 
     56         try:
---> 57             result = self.fn(*self.args, **self.kwargs)
     58         except BaseException as exc:
     59             self.future.set_exception(exc)

~/anaconda3/lib/python3.8/site-packages/cryptofeed/feedhandler.py in run(self, start_loop, install_signal_handlers, exception_handler)
    145             raise ValueError(txt)
    146 
--> 147         loop = asyncio.get_event_loop()
    148         # Good to enable when debugging or without code change: export PYTHONASYNCIODEBUG=1)
    149         # loop.set_debug(True)

~/anaconda3/lib/python3.8/asyncio/events.py in get_event_loop(self)
    637 
    638         if self._local._loop is None:
--> 639             raise RuntimeError('There is no current event loop in thread %r.'
    640                                % threading.current_thread().name)
    641 

RuntimeError: There is no current event loop in thread 'ThreadPoolExecutor-0_0'.

有人能帮帮我吗?非常感谢!

编辑:关注

def threadable():    
    f = FeedHandler()
    f.add_feed(Gateio(channels=[TRADES], symbols=list_tmp, callbacks={ TRADES: TradePostgresGateio(**postgres_cfg)}))
    f.run()


executor = concurrent.futures.ThreadPoolExecutor(16)
job2 = executor.submit(threadable)
job2.done()
job2.result()

我收到错误:关于事件循环,我似乎仍然遇到同样的错误...它可以解决吗?

RuntimeError                              Traceback (most recent call last)
<ipython-input-47-05c023dd326f> in <module>
     11 job2.done()
     12 
---> 13 job2.result()

~/anaconda3/lib/python3.8/concurrent/futures/_base.py in result(self, timeout)
    437                 raise CancelledError()
    438             elif self._state == FINISHED:
--> 439                 return self.__get_result()
    440             else:
    441                 raise TimeoutError()

~/anaconda3/lib/python3.8/concurrent/futures/_base.py in __get_result(self)
    386     def __get_result(self):
    387         if self._exception:
--> 388             raise self._exception
    389         else:
    390             return self._result

~/anaconda3/lib/python3.8/concurrent/futures/thread.py in run(self)
     55 
     56         try:
---> 57             result = self.fn(*self.args, **self.kwargs)
     58         except BaseException as exc:
     59             self.future.set_exception(exc)

<ipython-input-47-05c023dd326f> in threadable()
      2     f = FeedHandler()
      3     f.add_feed(Gateio(channels=[TRADES], symbols=list_tmp, callbacks={ TRADES: TradePostgresGateio(**postgres_cfg)}))
----> 4     f.run()
      5 
      6 

~/anaconda3/lib/python3.8/site-packages/cryptofeed/feedhandler.py in run(self, start_loop, install_signal_handlers, exception_handler)
    145             raise ValueError(txt)
    146 
--> 147         loop = asyncio.get_event_loop()
    148         # Good to enable when debugging or without code change: export PYTHONASYNCIODEBUG=1)
    149         # loop.set_debug(True)

~/anaconda3/lib/python3.8/asyncio/events.py in get_event_loop(self)
    637 
    638         if self._local._loop is None:
--> 639             raise RuntimeError('There is no current event loop in thread %r.'
    640                                % threading.current_thread().name)
    641 

RuntimeError: There is no current event loop in thread 'ThreadPoolExecutor-1_0'.

在代码的单线程版本中,所有这三个语句以简单的顺序方式在同一个线程中执行:

f = FeedHandler()
f.add_feed(Gateio(channels=[TRADES], symbols=list_tmp, callbacks={ TRADES: TradePostgresGateio(**postgres_cfg)}))
f.run()

在多线程版本中,您只将最后一行提交给执行器,因此它将 运行 在辅助线程中。但是这些语句,据我从你提供的代码中可以看出,仍然在主线程中执行:

f = FeedHandler()
f.add_feed(Gateio(channels=[TRADES], symbols=list_tmp, callbacks={ TRADES: TradePostgresGateio(**postgres_cfg)}))

你怎么知道这会奏效?一般来说,这将取决于 Gateio 和 Feedhandler 的实现细节。您需要非常小心地将程序分割成不同线程中的 运行 部分,尤其是在涉及第三方库调用时。那么,祝你好运。

你可以试试这个:

def threadable():
    f = FeedHandler()
    f.add_feed(Gateio(channels=[TRADES], symbols=list_tmp, callbacks={ TRADES: TradePostgresGateio(**postgres_cfg)}))
    f.run()

...

executor = concurrent.futures.ThreadPoolExecutor(16)
job2 = executor.submit(threadable)

那么,至少,您的整个步骤序列将在同一个线程中执行。

不过,我会担心那些回调。他们现在将 运行 在辅助线程中,您需要了解其后果。他们是否与用户界面程序交互?您的 UI 可能不支持多线程。

Executor 协议的使用在这里有点奇怪,因为您的函数没有 return 值。执行器在用于聚合 returned 值时最有用。您最好使用 threading 模块中的方法启动您需要的线程。