Python 嘲笑:修补 Python Pika 的 "basic_publish" 功能
Python mocking: Patching Python Pika's "basic_publish" function
考虑测试此代码:
import pika
class MQ_Client():
connection = None
channel = None
exchange_name="my_exchange"
def connect(self):
self.connection = pika.BlockingConnection(pika.ConnectionParameters(host="mq_server")
def publish(self, message):
properties = pika.BasicProperties(content_type='text/json', delivery_mode=1)
if self.channel.basic_publish(exchange=self.exchange_name, routing_key='', properties=properties, body=message):
log.info("Successfully published this message to exchange \"" + self.exchange_name + "\": " + message)
else:
log.error("Failed to publish message \"" + message + "\" to the exchange \"" + self.exchange_name + "\"!")
raise Exception("Failed to publish message to the queue.")
我想做的是修补 Pika 的 basic_publish
方法以引发异常,并且可以使用一些帮助来弄清楚如何做到这一点。这是我上次尝试的结果:
@mock.patch.object(pika.BlockingConnection, 'channel')
def test_that_mq_publish_problems_return_error(self, mocked_channel):
with self.client as client:
mocked_channel.basic_publish.side_effect = Exception()
response = client.put("/api/users/bob", json="somedata")
self.assertEqual(response.status_code, 500)
任何关于如何使这项工作的建议将不胜感激!
我成功了。模拟实例方法与模拟 class 方法完全不同。阅读 this excellent article 后,我终于明白了如何做到这一点,事实证明,我需要修补 basic_publish
方法的全部是像这样稍微修改我的测试:
@mock.patch('mq_client.pika.BlockingConnection', spec=pika.BlockingConnection)
def test_that_mq_publish_problems_return_error(self, mocked_connection):
with self.client as client:
mocked_connection.return_value.channel.return_value.basic_publish.return_value = False
考虑测试此代码:
import pika
class MQ_Client():
connection = None
channel = None
exchange_name="my_exchange"
def connect(self):
self.connection = pika.BlockingConnection(pika.ConnectionParameters(host="mq_server")
def publish(self, message):
properties = pika.BasicProperties(content_type='text/json', delivery_mode=1)
if self.channel.basic_publish(exchange=self.exchange_name, routing_key='', properties=properties, body=message):
log.info("Successfully published this message to exchange \"" + self.exchange_name + "\": " + message)
else:
log.error("Failed to publish message \"" + message + "\" to the exchange \"" + self.exchange_name + "\"!")
raise Exception("Failed to publish message to the queue.")
我想做的是修补 Pika 的 basic_publish
方法以引发异常,并且可以使用一些帮助来弄清楚如何做到这一点。这是我上次尝试的结果:
@mock.patch.object(pika.BlockingConnection, 'channel')
def test_that_mq_publish_problems_return_error(self, mocked_channel):
with self.client as client:
mocked_channel.basic_publish.side_effect = Exception()
response = client.put("/api/users/bob", json="somedata")
self.assertEqual(response.status_code, 500)
任何关于如何使这项工作的建议将不胜感激!
我成功了。模拟实例方法与模拟 class 方法完全不同。阅读 this excellent article 后,我终于明白了如何做到这一点,事实证明,我需要修补 basic_publish
方法的全部是像这样稍微修改我的测试:
@mock.patch('mq_client.pika.BlockingConnection', spec=pika.BlockingConnection)
def test_that_mq_publish_problems_return_error(self, mocked_connection):
with self.client as client:
mocked_connection.return_value.channel.return_value.basic_publish.return_value = False