如何模拟不同模块的鼠兔连接?

How to mock a pika connection for a different module?

我有一个 class 导入以下模块:

import pika
import pickle
from apscheduler.schedulers.background import BackgroundScheduler
import time
import logging
class RabbitMQ():
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))
        self.channel = self.connection.channel()
        self.sched = BackgroundScheduler()
        self.sched.add_job(self.keep_connection_alive, id='clean_old_data', trigger='cron', hour = '*', minute='*', second='*/50')
        self.sched.start()
    def publish_message(self, message , path="path"):
        message["path"] = path
        logging.info(message)
        message = pickle.dumps(message)
        self.channel.basic_publish(exchange="", routing_key="server", body=message)
    def keep_connection_alive(self):
        self.connection.process_data_events()
rabbitMQ = RabbitMQ()
def publish_message(message , path="path"):
    rabbitMQ.publish_message(message, path=path)

我的class.py:

import RabbitMQ as rq
class MyClass():
...

在为 MyClass 生成单元测试时,我无法模拟这部分代码的连接。并不断抛出异常。而且它根本不起作用

pika.exceptions.ConnectionClosed: Connection to 127.0.0.1:5672 failed: [Errno 111] Connection refused

我尝试了几种方法来模拟此连接,但其中 none 似乎有效。我想知道我能做些什么来支持这种测试?模拟整个 RabbitMQ 模块?或者也许只模拟连接

就像上面的评论者提到的,问题是您 RabbitMQ.

的全局创建

我下意识的反应是 "just get rid of that, and your module-level publish_message"。如果你能做到这一点,那就去寻求那个解决方案。您的 RabbitMQ class 上有一个 publish_message 接受相同的参数;然后任何调用者都应该创建你的 RabbitMQ class.

的实例

如果您不想要不能出于任何原因这样做,您应该只移动 move 的实例化模块级 publish_message 中的对象实例化如下:

def publish_message(message , path="path"):
    rabbitMQ = RabbitMQ()
    rabbitMQ.publish_message(message, path=path)

虽然每次调用它都会创建一个新连接。也许这没关系……但也许不是。因此,为了避免创建重复连接,您需要引入类似单例模​​式的东西:

class RabbitMQ():
    __instance = None

    ...

    @classmethod
    def get_instance(cls):
        if cls.__instance is None:
            cls.__instance = RabbitMQ()
        return cls.__instance

def publish_message(message , path="path"):
    RabbitMQ.get_instance().publish_message(message, path=path)

理想情况下,您会希望完全避免单例模式。任何调用者都应该存储 RabbitMQ 对象的单个实例并直接对其调用 publish_message

所以 TLDR/ideal 解决方案 IMO:只需删除最后 3 行。调用者应该创建一个 RabbitMQ 对象。

编辑:哦,为什么 它正在发生——当您导入该模块时,正在对其进行评估:rabbitMQ = RabbitMQ()。您模拟它的尝试是在评估之后发生的,但无法连接。