如何使用 Laravel 队列拦截 S3 上的新文件?
How to intercept a new file on S3 using Laravel Queues?
我有一个 S3 存储桶 mybucket
,我想在将新文件复制到该存储桶中时执行一些操作。对于通知,我想使用 SQS 队列 notifiqueue
,因为我的目标是使用 Laravel
访问该队列
因为我是在 CloudFormation
中创建我的基础设施,所以资源是这样创建的:
NotificationQueue:
Type: AWS::SQS::Queue
Properties:
VisibilityTimeout: 120
QueueName: 'NotificationQueue'
DataGateBucket:
Type: AWS::S3::Bucket
Properties:
AccessControl: BucketOwnerFullControl
BucketName: 'mybucket'
NotificationConfiguration:
QueueConfigurations:
- Event: 's3:ObjectCreated:*'
Queue: !GetAtt NotificationQueue.Arn
每次在存储桶中保存新文件时,S3 会自动在 SQS 中创建一个通知。
遗憾的是,负载的格式与 Laravel 标准作业负载不兼容,如果我 运行 NotificationQueue
上的工作进程,我会收到此错误:
local.ERROR: Undefined index: job {"exception":"[object] (ErrorException(code: 0): Undefined index: job at .../vendor/laravel/framework/src/Illuminate/Queue/Jobs/Job.php:273)
为了提供更完整的指示,这是我在通知中得到的内容(将 JSON 转换为 PHP 数组后)
array:1 [
"Records" => array:1 [
0 => array:9 [
"eventVersion" => "2.1"
"eventSource" => "aws:s3"
"awsRegion" => "eu-central-1"
"eventTime" => "2019-04-23T17:02:41.308Z"
"eventName" => "ObjectCreated:Put"
"userIdentity" => array:1 [
"principalId" => "AWS:XXXXXXXXXXXXXXXXXX"
]
"requestParameters" => array:1 [
"sourceIPAddress" => "217.64.198.7"
]
"responseElements" => array:2 [
"x-amz-request-id" => "602CE18B8DE0BE5C"
"x-amz-id-2" => "wA/A3Jl2XpoxBWJEgQzy11s6O28Cz9Wc6pVi6Ho1vnIrOjqsWkGozlUmqRdpYAfub0MqdF8d/YI="
]
"s3" => array:4 [
"s3SchemaVersion" => "1.0"
"configurationId" => "0d4eaa75-5730-495e-b6d4-368bf3690f30"
"bucket" => array:3 [
"name" => "mybucket"
"ownerIdentity" => array:1 [
"principalId" => "XXXXXXXXXXXXXXXXXX"
]
"arn" => "arn:aws:s3:::mybucket"
]
"object" => array:4 [
"key" => "dirName/myFile.txt"
"size" => 1991721
"eTag" => "824a20edad0091027b5d0fa6d78bb24f"
"sequencer" => "005CBF452E30AAC02A"
]
]
]
]
]
使用 Laravel 访问通知以便我可以触发一些其他选项来响应文件上传的有效/最佳/正确方法是什么?
我找到了一种获得所需行为的方法,但我不确定这是最好的方法,所以我 post 在这里,也许可以给我反馈。
当我们谈论 Laravel 队列时,很多配置来自 app.php
,特别是来自 Provider
部分。我设法添加了我需要覆盖 Original QueueServiceProvider
class 并替换它的行为:
// Here is the original Provider Class
//Illuminate\Queue\QueueServiceProvider::class,
// Here is the overridden Provider
\App\Providers\QueueServiceProvider::class,
新的QueueServiceProvider
class如下:
<?php
namespace App\Providers;
use App\Jobs\SqsNotifications\SqsConnector;
class QueueServiceProvider extends \Illuminate\Queue\QueueServiceProvider
{
/**
* Register the Amazon SQS queue connector.
*
* @param \Illuminate\Queue\QueueManager $manager
* @return void
*/
protected function registerSqsNotifConnector($manager)
{
$manager->addConnector('sqsNotif', function () {
return new SqsConnector();
});
}
public function registerConnectors($manager){
parent::registerConnectors($manager);
// Add the custom SQS notification connector
$this->registerSqsNotifConnector($manager);
}
}
注意新连接器 sqsNotif
,需要将其添加到 queue.php
'sqsNotif' => [
'driver' => 'sqsNotif',
'key' => env('AWS_ACCESS_KEY_ID'),
'secret' => env('AWS_SECRET_ACCESS_KEY'),
'prefix' => env('SQS_PREFIX', 'https://sqs.eu-central-1.amazonaws.com/your-account'),
'queue' => env('SQS_QUEUE', 'your-queue-name'),
'region' => env('AWS_DEFAULT_REGION', 'eu-central-1'),
],
在新的QueueServiceProvider
中我们只是注册了一个额外的连接器,其代码是:
<?php
namespace App\Jobs\SqsNotifications;
use Aws\Sqs\SqsClient;
use Illuminate\Support\Arr;
class SqsConnector extends \Illuminate\Queue\Connectors\SqsConnector
{
/**
* Establish a queue connection.
*
* @param array $config
* @return \Illuminate\Contracts\Queue\Queue
*/
public function connect(array $config)
{
$config = $this->getDefaultConfiguration($config);
if ($config['key'] && $config['secret']) {
$config['credentials'] = Arr::only($config, ['key', 'secret', 'token']);
}
return new SqsQueue(
new SqsClient($config), $config['queue'], $config['prefix'] ?? ''
);
}
}
SqsQueue也重新定义了,这样:
<?php
namespace App\Jobs\SqsNotifications;
class SqsQueue extends \Illuminate\Queue\SqsQueue
{
/**
* Pop the next job off of the queue.
*
* @param string $queue
* @return \Illuminate\Contracts\Queue\Job|null
*/
public function pop($queue = null)
{
$response = $this->sqs->receiveMessage([
'QueueUrl' => $queue = $this->getQueue($queue),
'AttributeNames' => ['ApproximateReceiveCount'],
]);
if (! is_null($response['Messages']) && count($response['Messages']) > 0) {
return new SqsJob(
$this->container, $this->sqs, $response['Messages'][0],
$this->connectionName, $queue
);
}
}
}
最后缺少的一块是SqsJob,定义如下:
<?php
namespace App\Jobs\SqsNotifications;
use Illuminate\Queue\Jobs\JobName;
/**
* Class SqsJob
* @package App\Jobs\SqsNotifications
*
* Alternate SQS job that is used in case of S3 notifications
*/
class SqsJob extends \Illuminate\Queue\Jobs\SqsJob
{
/**
* Get the name of the queued job class.
*
* @return string
*/
public function getName()
{
$bucketName = '';
// Define the name of the Process based on the bucket name
switch($this->payload()['Records'][0]['s3']['bucket']['name']){
case 'mybucket':
$bucketName = 'NewMyBucketFileJob';
break;
}
return $bucketName;
}
/**
* Fire the job.
*
* @return void
*/
public function fire()
{
// Mimic the original behavior with a different payload
$payload = $this->payload();
[$class, $method] = JobName::parse('\App\Jobs\' . $this->getName() . '@handle');
($this->instance = $this->resolve($class))->{$method}($payload);
// The Job wasn't automatically deleted, so we need to delete it manually once the process went fine
$this->delete();
}
}
此时,我只需要定义处理作业,例如下面的一个,在 NewMyBucketFileJob
:
<?php
namespace App\Jobs;
use Illuminate\Bus\Queueable;
use Illuminate\Queue\SerializesModels;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
class ProcessDataGateNewFile implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
/**
* Create a new job instance.
*
* @return void
*/
public function __construct()
{
}
/**
* Execute the job.
*
* @return void
*/
public function handle($data)
{
// Print the whole data structure
print_r($data);
// Or just the name of the uploaded file
print_r($data['Records'][0]['s3']['object']['key']);
}
}
这个过程有效,所以这是一个解决方案,但涉及很多 class 扩展,并且它非常脆弱,以防在未来的版本中更改内部队列实现。老实说,我想知道是否有更简单或更强大的东西
其实我也有类似的问题。 SQS 用于在不同系统之间传输数据和触发事件。一些可以是 S3 事件,另一个可以是 phyton lambda 函数。并非所有这些都在我手中,因此我无法更改有效负载。那么用 laravel 收听新的相关任务的最佳方法是什么?
我看到有一些解决方案可以像这样通过 sqs 将 laravel 任务带到 lambda:https://github.com/brefphp/laravel-bridge
我看到通过 worker 启用的某些部分可以触发 laravel,但我还没有测试过:https://github.com/dusterio/laravel-aws-worker
这似乎有点过时,但可能符合您的想法:https://github.com/dusterio/laravel-aws-worker
我有一个 S3 存储桶 mybucket
,我想在将新文件复制到该存储桶中时执行一些操作。对于通知,我想使用 SQS 队列 notifiqueue
,因为我的目标是使用 Laravel
因为我是在 CloudFormation
中创建我的基础设施,所以资源是这样创建的:
NotificationQueue:
Type: AWS::SQS::Queue
Properties:
VisibilityTimeout: 120
QueueName: 'NotificationQueue'
DataGateBucket:
Type: AWS::S3::Bucket
Properties:
AccessControl: BucketOwnerFullControl
BucketName: 'mybucket'
NotificationConfiguration:
QueueConfigurations:
- Event: 's3:ObjectCreated:*'
Queue: !GetAtt NotificationQueue.Arn
每次在存储桶中保存新文件时,S3 会自动在 SQS 中创建一个通知。
遗憾的是,负载的格式与 Laravel 标准作业负载不兼容,如果我 运行 NotificationQueue
上的工作进程,我会收到此错误:
local.ERROR: Undefined index: job {"exception":"[object] (ErrorException(code: 0): Undefined index: job at .../vendor/laravel/framework/src/Illuminate/Queue/Jobs/Job.php:273)
为了提供更完整的指示,这是我在通知中得到的内容(将 JSON 转换为 PHP 数组后)
array:1 [
"Records" => array:1 [
0 => array:9 [
"eventVersion" => "2.1"
"eventSource" => "aws:s3"
"awsRegion" => "eu-central-1"
"eventTime" => "2019-04-23T17:02:41.308Z"
"eventName" => "ObjectCreated:Put"
"userIdentity" => array:1 [
"principalId" => "AWS:XXXXXXXXXXXXXXXXXX"
]
"requestParameters" => array:1 [
"sourceIPAddress" => "217.64.198.7"
]
"responseElements" => array:2 [
"x-amz-request-id" => "602CE18B8DE0BE5C"
"x-amz-id-2" => "wA/A3Jl2XpoxBWJEgQzy11s6O28Cz9Wc6pVi6Ho1vnIrOjqsWkGozlUmqRdpYAfub0MqdF8d/YI="
]
"s3" => array:4 [
"s3SchemaVersion" => "1.0"
"configurationId" => "0d4eaa75-5730-495e-b6d4-368bf3690f30"
"bucket" => array:3 [
"name" => "mybucket"
"ownerIdentity" => array:1 [
"principalId" => "XXXXXXXXXXXXXXXXXX"
]
"arn" => "arn:aws:s3:::mybucket"
]
"object" => array:4 [
"key" => "dirName/myFile.txt"
"size" => 1991721
"eTag" => "824a20edad0091027b5d0fa6d78bb24f"
"sequencer" => "005CBF452E30AAC02A"
]
]
]
]
]
使用 Laravel 访问通知以便我可以触发一些其他选项来响应文件上传的有效/最佳/正确方法是什么?
我找到了一种获得所需行为的方法,但我不确定这是最好的方法,所以我 post 在这里,也许可以给我反馈。
当我们谈论 Laravel 队列时,很多配置来自 app.php
,特别是来自 Provider
部分。我设法添加了我需要覆盖 Original QueueServiceProvider
class 并替换它的行为:
// Here is the original Provider Class
//Illuminate\Queue\QueueServiceProvider::class,
// Here is the overridden Provider
\App\Providers\QueueServiceProvider::class,
新的QueueServiceProvider
class如下:
<?php
namespace App\Providers;
use App\Jobs\SqsNotifications\SqsConnector;
class QueueServiceProvider extends \Illuminate\Queue\QueueServiceProvider
{
/**
* Register the Amazon SQS queue connector.
*
* @param \Illuminate\Queue\QueueManager $manager
* @return void
*/
protected function registerSqsNotifConnector($manager)
{
$manager->addConnector('sqsNotif', function () {
return new SqsConnector();
});
}
public function registerConnectors($manager){
parent::registerConnectors($manager);
// Add the custom SQS notification connector
$this->registerSqsNotifConnector($manager);
}
}
注意新连接器 sqsNotif
,需要将其添加到 queue.php
'sqsNotif' => [
'driver' => 'sqsNotif',
'key' => env('AWS_ACCESS_KEY_ID'),
'secret' => env('AWS_SECRET_ACCESS_KEY'),
'prefix' => env('SQS_PREFIX', 'https://sqs.eu-central-1.amazonaws.com/your-account'),
'queue' => env('SQS_QUEUE', 'your-queue-name'),
'region' => env('AWS_DEFAULT_REGION', 'eu-central-1'),
],
在新的QueueServiceProvider
中我们只是注册了一个额外的连接器,其代码是:
<?php
namespace App\Jobs\SqsNotifications;
use Aws\Sqs\SqsClient;
use Illuminate\Support\Arr;
class SqsConnector extends \Illuminate\Queue\Connectors\SqsConnector
{
/**
* Establish a queue connection.
*
* @param array $config
* @return \Illuminate\Contracts\Queue\Queue
*/
public function connect(array $config)
{
$config = $this->getDefaultConfiguration($config);
if ($config['key'] && $config['secret']) {
$config['credentials'] = Arr::only($config, ['key', 'secret', 'token']);
}
return new SqsQueue(
new SqsClient($config), $config['queue'], $config['prefix'] ?? ''
);
}
}
SqsQueue也重新定义了,这样:
<?php
namespace App\Jobs\SqsNotifications;
class SqsQueue extends \Illuminate\Queue\SqsQueue
{
/**
* Pop the next job off of the queue.
*
* @param string $queue
* @return \Illuminate\Contracts\Queue\Job|null
*/
public function pop($queue = null)
{
$response = $this->sqs->receiveMessage([
'QueueUrl' => $queue = $this->getQueue($queue),
'AttributeNames' => ['ApproximateReceiveCount'],
]);
if (! is_null($response['Messages']) && count($response['Messages']) > 0) {
return new SqsJob(
$this->container, $this->sqs, $response['Messages'][0],
$this->connectionName, $queue
);
}
}
}
最后缺少的一块是SqsJob,定义如下:
<?php
namespace App\Jobs\SqsNotifications;
use Illuminate\Queue\Jobs\JobName;
/**
* Class SqsJob
* @package App\Jobs\SqsNotifications
*
* Alternate SQS job that is used in case of S3 notifications
*/
class SqsJob extends \Illuminate\Queue\Jobs\SqsJob
{
/**
* Get the name of the queued job class.
*
* @return string
*/
public function getName()
{
$bucketName = '';
// Define the name of the Process based on the bucket name
switch($this->payload()['Records'][0]['s3']['bucket']['name']){
case 'mybucket':
$bucketName = 'NewMyBucketFileJob';
break;
}
return $bucketName;
}
/**
* Fire the job.
*
* @return void
*/
public function fire()
{
// Mimic the original behavior with a different payload
$payload = $this->payload();
[$class, $method] = JobName::parse('\App\Jobs\' . $this->getName() . '@handle');
($this->instance = $this->resolve($class))->{$method}($payload);
// The Job wasn't automatically deleted, so we need to delete it manually once the process went fine
$this->delete();
}
}
此时,我只需要定义处理作业,例如下面的一个,在 NewMyBucketFileJob
:
<?php
namespace App\Jobs;
use Illuminate\Bus\Queueable;
use Illuminate\Queue\SerializesModels;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
class ProcessDataGateNewFile implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
/**
* Create a new job instance.
*
* @return void
*/
public function __construct()
{
}
/**
* Execute the job.
*
* @return void
*/
public function handle($data)
{
// Print the whole data structure
print_r($data);
// Or just the name of the uploaded file
print_r($data['Records'][0]['s3']['object']['key']);
}
}
这个过程有效,所以这是一个解决方案,但涉及很多 class 扩展,并且它非常脆弱,以防在未来的版本中更改内部队列实现。老实说,我想知道是否有更简单或更强大的东西
其实我也有类似的问题。 SQS 用于在不同系统之间传输数据和触发事件。一些可以是 S3 事件,另一个可以是 phyton lambda 函数。并非所有这些都在我手中,因此我无法更改有效负载。那么用 laravel 收听新的相关任务的最佳方法是什么?
我看到有一些解决方案可以像这样通过 sqs 将 laravel 任务带到 lambda:https://github.com/brefphp/laravel-bridge 我看到通过 worker 启用的某些部分可以触发 laravel,但我还没有测试过:https://github.com/dusterio/laravel-aws-worker 这似乎有点过时,但可能符合您的想法:https://github.com/dusterio/laravel-aws-worker