多进程兔子消费者
multiprocess rabbit consumer
提前致谢。
我是多处理的新手。
我创建了一个进程,我想通过该进程同时通过 Rabbit MQueue 使用数据,但它一次运行一个进程。
def start_consum(queue_name):
channel.basic_consume(func, queue=queue_name)
channel.start_consuming()
def process_start(number):
from multiprocessing import Process
events = ["ev1","ev2","ev3"]
for process in range(number):
for event in events:
proc = Process(target= start_consum(event))
proc.daemon = True
proc.start()
process_start(10)
在上面的代码中,它开始消费第一个事件,然后开始第二个事件。
您可以为此创建一个话题。
class Threaded_worker(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.credentials = pika.PlainCredentials('', '')
self.connection = pika.BlockingConnection(pika.ConnectionParameters(credentials=self.credentials,host=))
self.channel = self.connection.channel()
self.channel.basic_qos(prefetch_count=1)
events = ["ev1","ev2","ev3"]
for event in events:
self.channel.basic_consume(func, queue=event)
def run(self):
print 'start consuming'
self.channel.start_consuming()
def thread_start(numberofthreads):
for _ in range(numberofthreads):
td = Threaded_worker()
td.setDaemon(True)
td.start()
提前致谢。 我是多处理的新手。 我创建了一个进程,我想通过该进程同时通过 Rabbit MQueue 使用数据,但它一次运行一个进程。
def start_consum(queue_name):
channel.basic_consume(func, queue=queue_name)
channel.start_consuming()
def process_start(number):
from multiprocessing import Process
events = ["ev1","ev2","ev3"]
for process in range(number):
for event in events:
proc = Process(target= start_consum(event))
proc.daemon = True
proc.start()
process_start(10)
在上面的代码中,它开始消费第一个事件,然后开始第二个事件。
您可以为此创建一个话题。
class Threaded_worker(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.credentials = pika.PlainCredentials('', '')
self.connection = pika.BlockingConnection(pika.ConnectionParameters(credentials=self.credentials,host=))
self.channel = self.connection.channel()
self.channel.basic_qos(prefetch_count=1)
events = ["ev1","ev2","ev3"]
for event in events:
self.channel.basic_consume(func, queue=event)
def run(self):
print 'start consuming'
self.channel.start_consuming()
def thread_start(numberofthreads):
for _ in range(numberofthreads):
td = Threaded_worker()
td.setDaemon(True)
td.start()