Rabbitmq 多个消费者接收相同的消息,而不是循环

Rabbitmq multiple consumers receiving same message,instead of round robin

我正在尝试使用 rabbitmq-delayed-message-exchange 插件,它似乎可以正常发送延迟消息,该消息将在延迟间隔后接收。在消费一条消息时,我应该在 rabbitmq tutorial 2 中实现通常的循环方法,但我收到的是多个消费者消费同一条消息。客户端功能是我正在做的 basic.consume 。它与教程 2 中的循环法和延迟插件的代码大致相同。

代码的工作原理是这样的 - 接收函数接收到一条消息,该消息具有特定的延迟,将被发送回队列以供稍后使用。 发送消息延迟发送消息。

我面临的问题是 basic.consume,由于配置错误,我想我正在让多个消费者使用相同的消息。

我是否需要更改某些内容以实现多个消费者避免处理相同的请求。

class cities:
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
        self.channel = self.connection.channel()
        self.channel.exchange_declare("test-x", type="x-delayed-message", arguments={"x-delayed-type":"direct"})
        self.channel.queue_declare(queue='towns_queue')
        self.channel.queue_bind(queue="towns_queue", exchange="test-x", routing_key="towns_queue")
        self.logger = logging.getLogger("rabbitmq")
        self.logger.setLevel(logging.DEBUG)
        handler = RotatingFileHandler("logs/Receive.log", maxBytes=100*1024*1024, backupCount=100)
        self.logger.addHandler(handler)
        formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
        handler.setFormatter(formatter)
        self.errlogger = logging.getLogger("Msg_timer_Error")
        self.errlogger.setLevel(logging.ERROR)
        errhandler = RotatingFileHandler("logs/towns_error.log", maxBytes=100*1024*1024, backupCount=100)
        self.errlogger.addHandler(errhandler)
        errformatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
        errhandler.setFormatter(errformatter)

    def call_api(self,url):
        do something

    def server_log(self,msg,value):
        self.logger.info(msg+" : "+str(value))

    def send_msg(self,msg,delay):
        try:
            delay = delay - 40
            self.server_log("Delay Set : ",delay)
            delay *=1000
            self.channel.basic_publish(exchange='test-x',routing_key='towns_queue',body=msg,properties=pika.BasicProperties(headers={"x-delay"\
:delay},delivery_mode=1))
            self.server_log("Msg Sent",msg)
        except:
            self.errlogger.exception(msg)
            self.errlogger.exception(str(traceback.print_exc()))
            pass

    def receive(self,ch, method, properties, body):
        try:
            self.server_log("Received message : ",body)
            self.process(body)
            time.sleep(1)
            ch.basic_ack(delivery_tag = method.delivery_tag)
            return body
        except:
            self.errlogger.exception(body)
            self.errlogger.exception(str(traceback.print_exc()))
            pass

    def process(self,body):
        delay=self.call_api(body)
        if delay:
           self.send_msg(body,delay)

   def client(self):
        try:
            self.channel.basic_qos(prefetch_count=1)
            self.channel.basic_consume(self.receive,queue='towns_queue',no_ack=False)
            self.channel.start_consuming()
        except:
            self.errlogger.exception(str(traceback.print_exc()))
         pass

    def close(self):
         try:
            self.connection.close()
         except:
            self.errlogger.exception(str(traceback.print_exc()))
           pass

def multiprocess(num_process=4):
    procs = []
    objs = []
    for i in range(4):
        p = multiprocessing.Process(target=cities().client)
    p.start()
        procs.append(p)
    for p in procs:
        p.join()

if __name__ == "__main__":
    if len(sys.argv)> 1:
        num_proc = sys.argv[1]
        if num_proc.isdigit():
            num_proc = int(num_proc)
            multiprocess(num_proc)
        else:
            raise Exception ('num_of_processes needs to be an integer.')
    else:
        multiprocess()

感谢任何帮助。

更新:

这会不会有问题:

def client(self):
     try:
         self.channel.basic_qos(prefetch_count=1)
         self.channel.basic_consume(self.receive,queue='towns_queue',no_ack=False)
         self.channel.start_consuming()
     except:
         self.errlogger.exception(str(traceback.print_exc()))
         pass

教程中的示例为每条消息创建一个新连接。我尝试通过更改我的 send_message 函数来以这种方式工作,在该函数中我将创建一个新的客户端对象,该对象将创建一个新连接以在完成后发送和关闭连接。因此,一旦做出更改,它就可以正常工作了。

def send_msg(self,msg,delay=0):
    try:
        newClient=cities(0)
        if delay:
            delay = delay - 40
        self.server_log("Delay Set : ",delay)
        delay *=1000
        newClient.channel.basic_publish(exchange="test-x",routing_key='towns_queue',body=msg,properties=pika.BasicProperties(headers={"x-delay":delay},delivery_mode=2,))
        self.server_log("Msg Sent",msg)
        newClient.close()

    except:
        self.errlogger.exception(msg)
        self.errlogger.exception(str(traceback.print_exc()))
        pass

还必须更改 __init__ 构造函数以避免每次都创建记录器,但这不是问题的一部分。