线程内的 Asyncio 函数
Asyncio function inside a thread
cryptofeed 是一个 python 库,它使用 asyncio 库获取不同加密货币交易所的实时价格。
在这个简短的程序中,我们尝试 运行 独立线程中的 cryptofeed FeedHandler。代码示例如下所示:
import functools as fct
from cryptofeed import FeedHandler
from cryptofeed.defines import BID, ASK, L2_BOOK
from cryptofeed.exchanges import Kraken
from datetime import datetime
import threading
async def bookfunc(params , orderbooks, feed, symbol, book, timestamp, receipt_timestamp):
print(f'Timestamp: {timestamp} Cryptofeed Receipt: {receipt_timestamp} Feed: {feed} Symbol: {symbol}'
f' Book Bid Size is {len(book[BID])} Ask Size is {len(book[ASK])}')
orderbooks = filter_orderbook(orderbooks, book, symbol, params['orderbook']['depth'])
def func():
# Parameters
params = {'orderbook': {'depth': 2}, 'price_model':{}, 'trade_model': {}}
config = {'log': {'filename': 'logs/demo.log', 'level': 'INFO'}}
orderbooks = {}
f = FeedHandler(config=config)
f.add_feed(Kraken(checksum_validation=True, subscription={L2_BOOK: ['BTC-USD', 'ETH-USD', 'LINK-USD', 'LTC-USD', 'ADA-USD']},
callbacks={L2_BOOK: fct.partial(bookfunc, params, orderbooks)})) # This way passes the orderbooks inside the callback
f.run()
if __name__ == '__main__':
thread = threading.Thread(target=func, args=())
thread.start()
执行代码时,出现如下错误:
raise RuntimeError('There is no current event loop in thread %r.'
RuntimeError: There is no current event loop in thread 'Thread-1'.
知道如何解决这个问题吗?
编辑:
这是针对Whosebug中不同问题的解决方案。一个例子是下面的问题:
Running cryptofeed (asyncio library) in a new thread
您必须通过 asyncio
为您的线程手动设置事件 循环。您可以通过将函数包装在某种循环中来实现 setter:
def create_loop(func):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(func())
if __name__ == "__main__":
thread = threading.Thread(target=create_loop, args=(func,))
thread.start()
(从 cryptofeed==1.9.2
开始)有两件重要的事情可以从线程中实现 运行:
FeedHandler.run
默认设置信号处理程序, 必须
从主线程完成。为了避免这种情况,有
install_signal_handlers
方法参数。
-
FeedHandler
set's uvloop
's policy 的初始化器,但是
不调用 uvloop.install()
(假设它是应用程序的
我猜是责任)。没有它 asyncio.set_event_loop
没有效果。或者你可以设置 'uvloop': False
在 feed handler 配置中(如下所示),或者只是卸载
uvloop
.
import asyncio
import threading
from cryptofeed import FeedHandler
from cryptofeed.defines import BID, ASK, L2_BOOK
from cryptofeed.exchanges import Kraken
async def bookfunc(**kwargs):
print('bookfunc', kwargs)
def run_feed_handler_forever():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
config = {
'uvloop': False,
'log': {'filename': 'log.log', 'level': 'DEBUG'},
}
l2_book = ['BTC-USD', 'ETH-USD', 'LINK-USD', 'LTC-USD', 'ADA-USD']
feed = Kraken(
subscription={L2_BOOK: l2_book}, callbacks={L2_BOOK: bookfunc}
)
fh = FeedHandler(config)
fh.add_feed(feed)
fh.run(install_signal_handlers=False)
if __name__ == '__main__':
thread = threading.Thread(target=run_feed_handler_forever)
thread.start()
thread.join()
cryptofeed 是一个 python 库,它使用 asyncio 库获取不同加密货币交易所的实时价格。 在这个简短的程序中,我们尝试 运行 独立线程中的 cryptofeed FeedHandler。代码示例如下所示:
import functools as fct
from cryptofeed import FeedHandler
from cryptofeed.defines import BID, ASK, L2_BOOK
from cryptofeed.exchanges import Kraken
from datetime import datetime
import threading
async def bookfunc(params , orderbooks, feed, symbol, book, timestamp, receipt_timestamp):
print(f'Timestamp: {timestamp} Cryptofeed Receipt: {receipt_timestamp} Feed: {feed} Symbol: {symbol}'
f' Book Bid Size is {len(book[BID])} Ask Size is {len(book[ASK])}')
orderbooks = filter_orderbook(orderbooks, book, symbol, params['orderbook']['depth'])
def func():
# Parameters
params = {'orderbook': {'depth': 2}, 'price_model':{}, 'trade_model': {}}
config = {'log': {'filename': 'logs/demo.log', 'level': 'INFO'}}
orderbooks = {}
f = FeedHandler(config=config)
f.add_feed(Kraken(checksum_validation=True, subscription={L2_BOOK: ['BTC-USD', 'ETH-USD', 'LINK-USD', 'LTC-USD', 'ADA-USD']},
callbacks={L2_BOOK: fct.partial(bookfunc, params, orderbooks)})) # This way passes the orderbooks inside the callback
f.run()
if __name__ == '__main__':
thread = threading.Thread(target=func, args=())
thread.start()
执行代码时,出现如下错误:
raise RuntimeError('There is no current event loop in thread %r.'
RuntimeError: There is no current event loop in thread 'Thread-1'.
知道如何解决这个问题吗?
编辑: 这是针对Whosebug中不同问题的解决方案。一个例子是下面的问题:
Running cryptofeed (asyncio library) in a new thread
您必须通过 asyncio
为您的线程手动设置事件 循环。您可以通过将函数包装在某种循环中来实现 setter:
def create_loop(func):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(func())
if __name__ == "__main__":
thread = threading.Thread(target=create_loop, args=(func,))
thread.start()
(从 cryptofeed==1.9.2
开始)有两件重要的事情可以从线程中实现 运行:
FeedHandler.run
默认设置信号处理程序, 必须 从主线程完成。为了避免这种情况,有install_signal_handlers
方法参数。-
FeedHandler
set'suvloop
's policy 的初始化器,但是 不调用uvloop.install()
(假设它是应用程序的 我猜是责任)。没有它asyncio.set_event_loop
没有效果。或者你可以设置'uvloop': False
在 feed handler 配置中(如下所示),或者只是卸载uvloop
.
import asyncio
import threading
from cryptofeed import FeedHandler
from cryptofeed.defines import BID, ASK, L2_BOOK
from cryptofeed.exchanges import Kraken
async def bookfunc(**kwargs):
print('bookfunc', kwargs)
def run_feed_handler_forever():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
config = {
'uvloop': False,
'log': {'filename': 'log.log', 'level': 'DEBUG'},
}
l2_book = ['BTC-USD', 'ETH-USD', 'LINK-USD', 'LTC-USD', 'ADA-USD']
feed = Kraken(
subscription={L2_BOOK: l2_book}, callbacks={L2_BOOK: bookfunc}
)
fh = FeedHandler(config)
fh.add_feed(feed)
fh.run(install_signal_handlers=False)
if __name__ == '__main__':
thread = threading.Thread(target=run_feed_handler_forever)
thread.start()
thread.join()