在异步流期间打印项目
Print item during Async Stream
我正在尝试使用 kafka consumer/producer 构建一个基本的加密交易程序来获取 websocket 数据。
我的问题是,一旦我收到已使用的消息,我就会尝试创建一些代码来在分析消息后执行交易。在这种情况下,在函数 auto_trader 中,我想尝试打印“第二个任何东西”,但它被等待的 kafkaconsumer 函数阻止了。简化的代码如下所示:
'''
async def kafkaconsumer(some_topic, trade = False):
#does some for loop to return a message and sends the message to a function to storevalues#
await returns_values(first_msg, rec_msg, last_msg)
async def returns_values(first_msg, rec_msg, last_msg):
await asyncio.sleep(.01)
return [first_msg[1], rec_msg[1],rec_msg[2],rec_msg[3], rec_msg[4], rec_msg[5], rec_msg[6], last_msg[0]]
async def auto_trade(ticker):
print('anything')
reader = await kafkaconsumer(some_topic = 'ETHUSD', trade = True)
print('2nd anything') #this doesn't get printed out, instead I just continuously receive messages from the consumer
'''
它似乎卡在消息上,之后不会执行任何操作,即使我在 returns_values 函数中放置了几个等待睡眠项目。
我的设置是否存在根本性缺陷,或者我是否以某种方式滥用了 asyncio?
为了执行函数本身,我使用了以下内容:
'''
async def execute():
await asyncio.gather (
auto_trade('ETHUSD'),
auto_trade('BTCUSD'),
auto_trade('DOGE')
)
if __name__=='__main__':
trade = asyncio.run(execute())
这是个有趣的问题,我尝试使用以下代码重现您的行为:
import asyncio
from random import random
async def kafkaconsumer(name):
for i in [(name, 'says:', "wait me"), (name, 'says:', "please wait, I don't over yet"), (name, 'says:', "wait wait, I'm still here"), (name, 'says:', "okay I'm done, now you can leave")]:
await asyncio.sleep(random()*3)
print(i[0], i[1], i[2])
async def auto_trade(name):
print('wait for the', name)
await kafkaconsumer(name=name)
print('Ok I\'m done with', name, '. May to go to next step')
print('Next step with', name)
async def execute():
await asyncio.gather (
auto_trade('John'),
auto_trade('Bill'),
auto_trade('Frank')
)
if __name__=='__main__':
trade = asyncio.run(execute())
输出为:
wait for the John
wait for the Bill
wait for the Frank
Frank says: wait me
Frank says: please wait, I don't over yet
Bill says: wait me
John says: wait me
Frank says: wait wait, I'm still here
Bill says: please wait, I don't over yet
John says: please wait, I don't over yet
John says: wait wait, I'm still here
Bill says: wait wait, I'm still here
Frank says: okay I'm done, now you can leave
Ok I'm done with Frank . May to go to next step
Next step with Frank
Bill says: okay I'm done, now you can leave
Ok I'm done with Bill . May to go to next step
Next step with Bill
John says: okay I'm done, now you can leave
Ok I'm done with John . May to go to next step
Next step with John
所以,是的,await kafkaconsumer
正在等待 kafka 提供的所有消息结束。看起来代码隐藏在
后面
#does some for loop to return a message and sends the message to a function to storevalues#
是一种无限循环提供新消息的生成器。所以你永远不会到达 print('2nd anything')
没有 break
某些触发器的无限循环(或者 SIGTERM 例如)
所以这里不可能在不知道里面是什么的情况下推荐精确的解决方案kafkaconsumer
还有关于reader
,不确定,可能是代码简化错误,但是reader
没有意义,总是None
。 reader = await kafkaconsumer...
做与 await kafkaconsumer...
完全相同的事情。因此,如果您期望在实际代码中从中获得一些价值,那将是一个逻辑错误。您必须 return kafkaconsumer
中的内容(例如 return await returns_values(first_msg, rec_msg, last_msg)
)
我正在尝试使用 kafka consumer/producer 构建一个基本的加密交易程序来获取 websocket 数据。
我的问题是,一旦我收到已使用的消息,我就会尝试创建一些代码来在分析消息后执行交易。在这种情况下,在函数 auto_trader 中,我想尝试打印“第二个任何东西”,但它被等待的 kafkaconsumer 函数阻止了。简化的代码如下所示:
'''
async def kafkaconsumer(some_topic, trade = False):
#does some for loop to return a message and sends the message to a function to storevalues#
await returns_values(first_msg, rec_msg, last_msg)
async def returns_values(first_msg, rec_msg, last_msg):
await asyncio.sleep(.01)
return [first_msg[1], rec_msg[1],rec_msg[2],rec_msg[3], rec_msg[4], rec_msg[5], rec_msg[6], last_msg[0]]
async def auto_trade(ticker):
print('anything')
reader = await kafkaconsumer(some_topic = 'ETHUSD', trade = True)
print('2nd anything') #this doesn't get printed out, instead I just continuously receive messages from the consumer
'''
它似乎卡在消息上,之后不会执行任何操作,即使我在 returns_values 函数中放置了几个等待睡眠项目。
我的设置是否存在根本性缺陷,或者我是否以某种方式滥用了 asyncio?
为了执行函数本身,我使用了以下内容:
'''
async def execute():
await asyncio.gather (
auto_trade('ETHUSD'),
auto_trade('BTCUSD'),
auto_trade('DOGE')
)
if __name__=='__main__':
trade = asyncio.run(execute())
这是个有趣的问题,我尝试使用以下代码重现您的行为:
import asyncio
from random import random
async def kafkaconsumer(name):
for i in [(name, 'says:', "wait me"), (name, 'says:', "please wait, I don't over yet"), (name, 'says:', "wait wait, I'm still here"), (name, 'says:', "okay I'm done, now you can leave")]:
await asyncio.sleep(random()*3)
print(i[0], i[1], i[2])
async def auto_trade(name):
print('wait for the', name)
await kafkaconsumer(name=name)
print('Ok I\'m done with', name, '. May to go to next step')
print('Next step with', name)
async def execute():
await asyncio.gather (
auto_trade('John'),
auto_trade('Bill'),
auto_trade('Frank')
)
if __name__=='__main__':
trade = asyncio.run(execute())
输出为:
wait for the John
wait for the Bill
wait for the Frank
Frank says: wait me
Frank says: please wait, I don't over yet
Bill says: wait me
John says: wait me
Frank says: wait wait, I'm still here
Bill says: please wait, I don't over yet
John says: please wait, I don't over yet
John says: wait wait, I'm still here
Bill says: wait wait, I'm still here
Frank says: okay I'm done, now you can leave
Ok I'm done with Frank . May to go to next step
Next step with Frank
Bill says: okay I'm done, now you can leave
Ok I'm done with Bill . May to go to next step
Next step with Bill
John says: okay I'm done, now you can leave
Ok I'm done with John . May to go to next step
Next step with John
所以,是的,await kafkaconsumer
正在等待 kafka 提供的所有消息结束。看起来代码隐藏在
#does some for loop to return a message and sends the message to a function to storevalues#
是一种无限循环提供新消息的生成器。所以你永远不会到达 print('2nd anything')
没有 break
某些触发器的无限循环(或者 SIGTERM 例如)
所以这里不可能在不知道里面是什么的情况下推荐精确的解决方案kafkaconsumer
还有关于reader
,不确定,可能是代码简化错误,但是reader
没有意义,总是None
。 reader = await kafkaconsumer...
做与 await kafkaconsumer...
完全相同的事情。因此,如果您期望在实际代码中从中获得一些价值,那将是一个逻辑错误。您必须 return kafkaconsumer
中的内容(例如 return await returns_values(first_msg, rec_msg, last_msg)
)