'promisify' (async/await) 在 Python 中使用 pymsteams 向 Microsoft Teams 发送消息

'promisify' (async/await) sending a message to Microsoft Teams using pymsteams in Python

我在 Python 脚本中使用 logging 模块向 MS Teams 发送大量消息。不幸的是,这很慢,所以我想向消息添加 async/await 功能。

这是我的记录器模块(有些简化):

from logging import StreamHandler
import pymsteams

class TeamsHandler(StreamHandler):

    def __init__(self, channel_url):
        StreamHandler.__init__(self)
        self.channel_url = channel_url
        self.client = pymsteams.connectorcard(self.channel_url)

    def emit(self, record):
        msg = self.format(record)
        self.client.text(msg)
        try:
            self.client.send()
        except:
            print(f"{msg} could not be sent to Teams")

然后您将在常规脚本中使用它:

import logging
from TeamsHandler import TeamsHandler #the module above

my_logger = logging.getLogger('TestLogging')
my_logger.setLevel(logging.DEBUG)
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.DEBUG)
my_logger.addHandler(console_handler)

CHANNEL_ID = "https://outlook.office.com/webhook/your-big-number"
teamshandler = TeamsHandler(CHANNEL_ID)
teamshandler.setFormatter(logging.Formatter('%(levelname)s %(message)s'))
teamshandler.setLevel(logging.DEBUG)
my_logger.addHandler(teamshandler)
for i in range(1,100):
    my_logger.error(f"this is an error [{i}]")
    my_logger.info(f"this is an info [{i}]")
do_something_else()

我怎样才能做到 do_something_else 立即(几乎)执行,而不必等待 200 条消息到达 Teams?

我尝试在 TeamsHandler 模块中添加一些 asyncawait 关键字,但没有成功,所以我没有将它们放在问题中。如果不是完整的解决方案,很高兴得到一些指示。

理想情况下,消息的顺序应随着脚本的进行而保持不变。

如果 pymsteams 不支持 async/await,那么将 async 添加到您的函数中不会真正帮助您,因为您最终仍会从 [= 调用同步代码11=]。即使它确实支持 async/await,它仍然无法工作,因为您是从 Python 日志记录 API 内部调用它们,它本身不是异步的。 Async/await 无法神奇地将同步代码转换为异步,程序必须全面使用 async/await。

但是如果您需要在后台执行 运行 某种意义上的异步执行,则可以改用线程。比如创建一个专门的线程用于日志记录,如:

class TeamsHandler(StreamHandler):
    def __init__(self, channel_url):
        super().__init__()
        self.channel_url = channel_url
        self.client = pymsteams.connectorcard(self.channel_url)
        self.queue = queue.Queue()
        self.thread = threading.Thread(target=self._worker)
        self.thread.start()
        # shutdown the worker at process exit
        atexit.register(self.queue.put, None)

    def _worker(self):
        while True:
            record = self.queue.get()
            if record is None:
                break
            msg = self.format(record)
            self.client.text(msg)
            try:
                self.client.send()
            except:
                print(f"{msg} could not be sent to Teams")

    def emit(self, record):
        # enqueue the record to log and return control to the caller
        self.queue.put(record)

当然,这有一个缺点,即如果日志记录后端出现问题,您的程序可能会比您在日志中看到的内容提前很多——但是当您删除日志记录和日志记录之间的同步时,情况总是如此。执行。