Python Flask 鼠兔消费者 (RabbitMQ)

Python Flask Pika Consumer (RabbitMQ)

我有两个小

Python Flask

应用

  1. Appone --> 制作人
  2. Apptwo --> 消费者

两者在不同 docker-container 中,由 docker-compose

精心策划

我没有从生产者到消费者获取数据...即使我在 apptwo 中启动 start.consuming() 生产者也无法将任何数据发送到 RabbitMQ Broker 也许有人可以帮助我。非常感谢

docker-撰写:

version: '3'
services:

  appone:
    container_name: appone
    restart: always
    build:
      context: ./appone
      dockerfile: Dockerfile
    environment:
      FLASK_APP: ./app.py        
    volumes:
      - './appone:/code/:cached'
    ports:
      - "5001:5001"

  apptwo:
    container_name: apptwo
    restart: always
    build:
      context: ./apptwo
      dockerfile: Dockerfile
    environment:
      FLASK_DEBUG: 1
      FLASK_APP: ./app.py         
    volumes:
      - ./apptwo:/code:cached 
    ports:
      - "5002:5002"     

  rabbitmq:
    image: "rabbitmq:3-management"
    hostname: "rabbit"
    ports:
      - "15672:15672"
      - "5672:5672"
    labels:
      NAME: "rabbitmq"
    volumes:
      - ./rabbitmq/rabbitmq-isolated.conf:/etc/rabbitmq/rabbitmq.config

appone(制作人)

from flask import Flask
from flask_restful import Resource, Api
import pika

app = Flask(__name__)
api = Api(app)

app.config['DEBUG'] = True

message = "Hello World, its me appone"


class HelloWorld(Resource):
    def get(self):
        connection = pika.BlockingConnection(
            pika.ConnectionParameters(host='rabbitmq'))
        channel = connection.channel()

        channel.queue_declare(queue='hello', durable=True)

        channel.basic_publish(exchange='', routing_key='hello', body='Hello World!', properties=pika.BasicProperties(delivery_mode=2))

        connection.close()

        return {'message': message}


api.add_resource(HelloWorld, '/api/appone/post')

if __name__ == '__main__':
    # Development
    app.run(host="0.0.0.0", port=5001)

apptwo(消费者)

from flask import Flask
from flask_restful import Resource, Api
import pika
from threading import Thread

app = Flask(__name__)
api = Api(app)

app.config['DEBUG'] = True

data = []

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='rabbitmq'))

channel = connection.channel()

channel.queue_declare(queue='hello', durable=True)

def callback(ch, method, properties, body):
    data.append(body)
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_consume(queue='hello', on_message_callback=callback)

thread = Thread(channel.start_consuming())
thread.start()

class HelloWorld(Resource):
    def get(self):
        return {'message': data}

api.add_resource(HelloWorld, '/api/apptwo/get')

if __name__ == '__main__':
    app.run(debug=True, host="0.0.0.0", port=5002)

目标 在这个简单的示例中,我只想在 apptwo 中接收数据并将其存储在数据列表中...

再次感谢!!

在 apptwo(消费者)中:

thread = Thread(channel.start_consuming())
thread.start()

这里从未调用Thread的构造函数调用,因为之前调用了channel.start_consuming,这是阻塞的。将您的代码更改为以下内容可能会有所帮助。

thread = Thread(target = channel.start_consuming)
thread.start()