Logstash - 如何通过 RabbitMQ 触发 Celery 任务

Logstash - How to trigger Celery tasks through RabbitMQ

谁能给我解释一下如何通过 Logstash 触发 Celery 任务? 可能吗?

如果我尝试通过 'php-amqplib' 库在 PHP 中执行此操作,它工作正常:(不使用 Logstash)

$connection = new AMQPStreamConnection(
    'rabbitmq.local',
    5672,
    'guest',
    'guest'
);
$channel = $connection->channel();
$channel->queue_declare(
    'celery',
    false,
    true,
    false,
    false
);
$taskId = rand(1000, 10000);
$props = array(
    'content_type' => 'application/json',
    'content_encoding' => 'utf-8',
);

$body = array(
    'task'      => 'process_next_task',
    'lang'      => 'py',
    'args'      => array('ktest' => 'vtest'),
    'kwargs'    => array('ktest' => 'vtest'),
    'origin'    => '@'.'mytest',
    'id'        => $taskId,
);

$msg = new AMQPMessage(json_encode($body), $props);
$channel->basic_publish($msg, 'celery', 'celery');

根据 Celery 文档:

http://docs.celeryproject.org/en/latest/internals/protocol.html

我正在尝试以 json 格式发送请求,这是我的 Logstash 过滤器:

ruby 
{
    remove_field => ['headers', '@timestamp', '@version', 'host', 'type']
    code => "
        event.set('properties', 
        {
            :content_type => 'application/json',
            :content_encoding => 'utf-8'
        })
    "
}

Celery 的答案是:

[2017-05-05 14:35:09,090: WARNING/MainProcess] Received and deleted unknown message.  Wrong destination?!
{content_type:None content_encoding:None delivery_info:{'exchange': 'celery', 'routing_key': 'celery', 'redelivered': False, 'consumer_tag': 'None4', 'delivery_tag': 66} headers={}}

基本上,Celery 无法解码我的消息格式或更好...我无法以 JSON 格式设置请求 :)

这让我发疯,提前感谢您提供任何线索:)

算了,这是我在Logstash中的输出插件

rabbitmq
            {
                key             => "celery"
                exchange        => "celery"
                exchange_type   => "direct"
                user            => "${RABBITMQ_USER}"
                password        => "${RABBITMQ_PASSWORD}"
                host            => "${RABBITMQ_HOST}"
                port            => "${RABBITMQ_PORT}"
                durable         => true
                persistent      => true
                codec           => json

            }

根据this question中提供的信息,你不能。

当你在玩 ruby 过滤器中的事件时,你实际上是在玩消息 body 中的内容,而你想设置rabbitmq headers 和消息的属性。

直到该功能 tackled,我不认为你能够实现它,当然除非你自己实现它。毕竟,该插件在 github.

上可用

正如 Olivier 所说,现在是不可能的,但我已经为官方项目创建了一个 pull request。 https://github.com/logstash-plugins/logstash-output-rabbitmq/pull/59

如果您正在寻找工作版本,请查看我的克隆:

https://github.com/useless-stuff/logstash-output-rabbitmq

你应该非常害怕那个代码:)

我离成为 Ruby 开发人员还很遥远

但它有效:)