Logstash - 如何通过 RabbitMQ 触发 Celery 任务
Logstash - How to trigger Celery tasks through RabbitMQ
谁能给我解释一下如何通过 Logstash 触发 Celery 任务?
可能吗?
如果我尝试通过 'php-amqplib' 库在 PHP 中执行此操作,它工作正常:(不使用 Logstash)
$connection = new AMQPStreamConnection(
'rabbitmq.local',
5672,
'guest',
'guest'
);
$channel = $connection->channel();
$channel->queue_declare(
'celery',
false,
true,
false,
false
);
$taskId = rand(1000, 10000);
$props = array(
'content_type' => 'application/json',
'content_encoding' => 'utf-8',
);
$body = array(
'task' => 'process_next_task',
'lang' => 'py',
'args' => array('ktest' => 'vtest'),
'kwargs' => array('ktest' => 'vtest'),
'origin' => '@'.'mytest',
'id' => $taskId,
);
$msg = new AMQPMessage(json_encode($body), $props);
$channel->basic_publish($msg, 'celery', 'celery');
根据 Celery 文档:
http://docs.celeryproject.org/en/latest/internals/protocol.html
我正在尝试以 json 格式发送请求,这是我的 Logstash 过滤器:
ruby
{
remove_field => ['headers', '@timestamp', '@version', 'host', 'type']
code => "
event.set('properties',
{
:content_type => 'application/json',
:content_encoding => 'utf-8'
})
"
}
Celery 的答案是:
[2017-05-05 14:35:09,090: WARNING/MainProcess] Received and deleted unknown message. Wrong destination?!
{content_type:None content_encoding:None delivery_info:{'exchange': 'celery', 'routing_key': 'celery', 'redelivered': False, 'consumer_tag': 'None4', 'delivery_tag': 66} headers={}}
基本上,Celery 无法解码我的消息格式或更好...我无法以 JSON 格式设置请求 :)
这让我发疯,提前感谢您提供任何线索:)
算了,这是我在Logstash中的输出插件
rabbitmq
{
key => "celery"
exchange => "celery"
exchange_type => "direct"
user => "${RABBITMQ_USER}"
password => "${RABBITMQ_PASSWORD}"
host => "${RABBITMQ_HOST}"
port => "${RABBITMQ_PORT}"
durable => true
persistent => true
codec => json
}
根据this question中提供的信息,你不能。
当你在玩 ruby 过滤器中的事件时,你实际上是在玩消息 body 中的内容,而你想设置rabbitmq headers 和消息的属性。
直到该功能 tackled,我不认为你能够实现它,当然除非你自己实现它。毕竟,该插件在 github.
上可用
正如 Olivier 所说,现在是不可能的,但我已经为官方项目创建了一个 pull request。
https://github.com/logstash-plugins/logstash-output-rabbitmq/pull/59
如果您正在寻找工作版本,请查看我的克隆:
https://github.com/useless-stuff/logstash-output-rabbitmq
你应该非常害怕那个代码:)
我离成为 Ruby 开发人员还很遥远
但它有效:)
谁能给我解释一下如何通过 Logstash 触发 Celery 任务? 可能吗?
如果我尝试通过 'php-amqplib' 库在 PHP 中执行此操作,它工作正常:(不使用 Logstash)
$connection = new AMQPStreamConnection(
'rabbitmq.local',
5672,
'guest',
'guest'
);
$channel = $connection->channel();
$channel->queue_declare(
'celery',
false,
true,
false,
false
);
$taskId = rand(1000, 10000);
$props = array(
'content_type' => 'application/json',
'content_encoding' => 'utf-8',
);
$body = array(
'task' => 'process_next_task',
'lang' => 'py',
'args' => array('ktest' => 'vtest'),
'kwargs' => array('ktest' => 'vtest'),
'origin' => '@'.'mytest',
'id' => $taskId,
);
$msg = new AMQPMessage(json_encode($body), $props);
$channel->basic_publish($msg, 'celery', 'celery');
根据 Celery 文档:
http://docs.celeryproject.org/en/latest/internals/protocol.html
我正在尝试以 json 格式发送请求,这是我的 Logstash 过滤器:
ruby
{
remove_field => ['headers', '@timestamp', '@version', 'host', 'type']
code => "
event.set('properties',
{
:content_type => 'application/json',
:content_encoding => 'utf-8'
})
"
}
Celery 的答案是:
[2017-05-05 14:35:09,090: WARNING/MainProcess] Received and deleted unknown message. Wrong destination?!
{content_type:None content_encoding:None delivery_info:{'exchange': 'celery', 'routing_key': 'celery', 'redelivered': False, 'consumer_tag': 'None4', 'delivery_tag': 66} headers={}}
基本上,Celery 无法解码我的消息格式或更好...我无法以 JSON 格式设置请求 :)
这让我发疯,提前感谢您提供任何线索:)
算了,这是我在Logstash中的输出插件
rabbitmq
{
key => "celery"
exchange => "celery"
exchange_type => "direct"
user => "${RABBITMQ_USER}"
password => "${RABBITMQ_PASSWORD}"
host => "${RABBITMQ_HOST}"
port => "${RABBITMQ_PORT}"
durable => true
persistent => true
codec => json
}
根据this question中提供的信息,你不能。
当你在玩 ruby 过滤器中的事件时,你实际上是在玩消息 body 中的内容,而你想设置rabbitmq headers 和消息的属性。
直到该功能 tackled,我不认为你能够实现它,当然除非你自己实现它。毕竟,该插件在 github.
上可用正如 Olivier 所说,现在是不可能的,但我已经为官方项目创建了一个 pull request。 https://github.com/logstash-plugins/logstash-output-rabbitmq/pull/59
如果您正在寻找工作版本,请查看我的克隆:
https://github.com/useless-stuff/logstash-output-rabbitmq
你应该非常害怕那个代码:)
我离成为 Ruby 开发人员还很遥远
但它有效:)