在异步流期间打印项目

Print item during Async Stream

我正在尝试使用 kafka consumer/producer 构建一个基本的加密交易程序来获取 websocket 数据。

我的问题是,一旦我收到已使用的消息,我就会尝试创建一些代码来在分析消息后执行交易。在这种情况下,在函数 auto_trader 中,我想尝试打印“第二个任何东西”,但它被等待的 kafkaconsumer 函数阻止了。简化的代码如下所示:

'''

async def kafkaconsumer(some_topic, trade = False):
   #does some for loop to return a message and sends the message to a function to storevalues#
   await returns_values(first_msg, rec_msg, last_msg)

async def returns_values(first_msg, rec_msg, last_msg):
   await asyncio.sleep(.01)
   return [first_msg[1], rec_msg[1],rec_msg[2],rec_msg[3], rec_msg[4], rec_msg[5], rec_msg[6], last_msg[0]]

async def auto_trade(ticker):
   print('anything')
   reader = await kafkaconsumer(some_topic = 'ETHUSD', trade = True)
   print('2nd anything') #this doesn't get printed out, instead I just continuously receive messages from the consumer

'''

它似乎卡在消息上,之后不会执行任何操作,即使我在 returns_values 函数中放置了几个等待睡眠项目。

我的设置是否存在根本性缺陷,或者我是否以某种方式滥用了 asyncio?

为了执行函数本身,我使用了以下内容:

'''

async def execute():
   await asyncio.gather (
   auto_trade('ETHUSD'),
   auto_trade('BTCUSD'),
   auto_trade('DOGE')
   )

if __name__=='__main__':
   trade = asyncio.run(execute())

这是个有趣的问题,我尝试使用以下代码重现您的行为:

import asyncio
from random import random

async def kafkaconsumer(name):
   for i in [(name, 'says:', "wait me"), (name, 'says:', "please wait, I don't over yet"), (name, 'says:', "wait wait, I'm still here"), (name, 'says:', "okay I'm done, now you can leave")]:
     await asyncio.sleep(random()*3)
     print(i[0], i[1], i[2])

async def auto_trade(name):
   print('wait for the', name)
   await kafkaconsumer(name=name)
   print('Ok I\'m done with', name, '. May to go to next step')
   print('Next step with', name)

async def execute():
   await asyncio.gather (
   auto_trade('John'),
   auto_trade('Bill'),
   auto_trade('Frank')
   )

if __name__=='__main__':
   trade = asyncio.run(execute())

输出为:

wait for the John
wait for the Bill
wait for the Frank
Frank says: wait me
Frank says: please wait, I don't over yet
Bill says: wait me
John says: wait me
Frank says: wait wait, I'm still here
Bill says: please wait, I don't over yet
John says: please wait, I don't over yet
John says: wait wait, I'm still here
Bill says: wait wait, I'm still here
Frank says: okay I'm done, now you can leave
Ok I'm done with Frank . May to go to next step
Next step with Frank
Bill says: okay I'm done, now you can leave
Ok I'm done with Bill . May to go to next step
Next step with Bill
John says: okay I'm done, now you can leave
Ok I'm done with John . May to go to next step
Next step with John

所以,是的,await kafkaconsumer 正在等待 kafka 提供的所有消息结束。看起来代码隐藏在

后面

#does some for loop to return a message and sends the message to a function to storevalues#

是一种无限循环提供新消息的生成器。所以你永远不会到达 print('2nd anything') 没有 break 某些触发器的无限循环(或者 SIGTERM 例如)

所以这里不可能在不知道里面是什么的情况下推荐精确的解决方案kafkaconsumer

还有关于reader,不确定,可能是代码简化错误,但是reader没有意义,总是Nonereader = await kafkaconsumer... 做与 await kafkaconsumer... 完全相同的事情。因此,如果您期望在实际代码中从中获得一些价值,那将是一个逻辑错误。您必须 return kafkaconsumer 中的内容(例如 return await returns_values(first_msg, rec_msg, last_msg)