shiftonelabs / laravel-sqs-fifo-queue
Adds a Laravel queue driver for Amazon SQS FIFO queues.
Installs: 3 534 827
Dependents: 3
Suggesters: 0
Security: 0
Stars: 150
Watchers: 10
Forks: 61
Open Issues: 0
Requires
- php: >=8.0.2
- aws/aws-sdk-php: ~3.0
- illuminate/queue: >=9.0
- illuminate/support: >=9.0
- ramsey/uuid: >=4.2.2
Requires (Dev)
- illuminate/mail: >=9.0
- illuminate/notifications: >=9.0
- mockery/mockery: ~1.3
- phpunit/phpunit: ~9.3 || ~10.0
- shiftonelabs/codesniffer-standard: 0.*
- squizlabs/php_codesniffer: 3.*
- vlucas/phpdotenv: ~5.4
README
This Laravel/Lumen package provides a queue driver for Amazon's SQS FIFO queues. While Laravel works with Amazon's SQS standard queues out of the box, FIFO queues are slightly different and are not handled properly by Laravel. That is where this package comes in.
Versions
This package has been tested on Laravel 4.1 through Laravel 11.x, though it may continue to work on later versions as they are released. This section will be updated to reflect the versions on which the package has actually been tested.
This readme has been updated to show information for the most currently supported versions (9.x - 11.x). For Laravel 4.1 through Laravel 8.x, view the 2.x branch.
Install
Via Composer
$ composer require shiftonelabs/laravel-sqs-fifo-queue
Once composer has been updated and the package has been installed, the service provider will need to be loaded.
Laravel 9.x, 10.x, 11.x
This package uses auto package discovery. The service provider will automatically be registered.
Lumen 9.x, 10.x, 11.x
Open bootstrap/app.php
and add following line under the "Register Service Providers" section:
$app->register(ShiftOneLabs\LaravelSqsFifoQueue\LaravelSqsFifoQueueServiceProvider::class);
Configuration
Laravel/Lumen 9.x, 10.x, 11.x
If using Lumen, create a config
directory in your project root if you don't already have one. Next, copy vendor/laravel/lumen-framework/config/queue.php
to config/queue.php
.
Now, for both Laravel and Lumen, open config/queue.php
and add the following entry to the connections
array.
'sqs-fifo' => [
'driver' => 'sqs-fifo',
'key' => env('AWS_ACCESS_KEY_ID'),
'secret' => env('AWS_SECRET_ACCESS_KEY'),
'prefix' => env('SQS_FIFO_PREFIX', 'https://sqs.us-east-1.amazonaws.com/your-account-id'),
'queue' => env('SQS_FIFO_QUEUE', 'default.fifo'),
'suffix' => env('SQS_FIFO_SUFFIX'),
'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),
'after_commit' => false,
'group' => 'default',
'deduplicator' => env('SQS_FIFO_DEDUPLICATOR', 'unique'),
'allow_delay' => env('SQS_FIFO_ALLOW_DELAY', false),
],
Example .env file:
AWS_ACCESS_KEY_ID=ABCDEFGHIJKLMNOPQRST
AWS_SECRET_ACCESS_KEY=1a23bc/deFgHijKl4mNOp5qrS6TUVwXyz7ABCDef
SQS_FIFO_PREFIX=https://sqs.us-east-1.amazonaws.com/123456789012
SQS_FIFO_QUEUE=queuename.fifo
SQS_FIFO_SUFFIX=-staging
AWS_DEFAULT_REGION=us-east-1
SQS_FIFO_DEDUPLICATOR=unique
SQS_FIFO_ALLOW_DELAY=false
If you'd like this to be the default connection, also set QUEUE_CONNECTION=sqs-fifo
in the .env
file.
Capsule
If using the illuminate\queue
component Capsule outside of Lumen/Laravel:
use Illuminate\Queue\Capsule\Manager as Queue; use ShiftOneLabs\LaravelSqsFifoQueue\LaravelSqsFifoQueueServiceProvider; $queue = new Queue; $queue->addConnection([ 'driver' => 'sqs-fifo', 'key' => 'your-public-key', // ex: ABCDEFGHIJKLMNOPQRST 'secret' => 'your-secret-key', // ex: 1a23bc/deFgHijKl4mNOp5qrS6TUVwXyz7ABCDef 'prefix' => 'your-prefix', // ex: https://sqs.us-east-1.amazonaws.com/your-account-id 'queue' => 'your-queue-name', // ex: queuename.fifo 'suffix' => 'your-suffix', // ex: -staging 'region' => 'your-queue-region', // ex: us-east-1 'after_commit' => false, 'group' => 'default', 'deduplicator' => 'unique', 'allow_delay' => false, ], 'sqs-fifo'); // Make this Capsule instance available globally via static methods... (optional) $queue->setAsGlobal(); // Register the 'queue' alias in the Container, then register the SQS FIFO provider. $app = $queue->getContainer(); $app->instance('queue', $queue->getQueueManager()); (new LaravelSqsFifoQueueServiceProvider($app))->register();
Credentials
The key
and secret
config options may be omitted if using one of the alternative options for providing AWS credentials (e.g. using an AWS credentials file). More information about alternative options is available in the AWS PHP SDK guide here.
AWS STS Session Token
The 'token' => env('AWS_SESSION_TOKEN'),
config option may be added if you need to specify an AWS STS temporary session token in the config. This is needed for some specific environments, such as AWS Lambda.
Queue Suffix
The suffix
config option is used to support queues for different environments without having to specify the environment suffix when using the queue. For example, if you have an emails-staging.fifo
queue and an emails-production.fifo
queue, you can set the suffix
config to -staging
or -production
based on the environment, and your code can continue to use emails.fifo
without needing to know the environment. So, Job::dispatch()->onQueue('emails.fifo')
will use either the emails-staging.fifo
or the emails-production.fifo
queue, depending on the suffix
defined in the config.
NB: SQS FIFO queues must end with a .fifo
suffix. As seen in the example above, any suffix
defined in the config will come before the required .fifo
suffix. Do not specify .fifo
in the suffix config or the queue name will not generate correctly.
Jobs & Database Transactions
The after_commit
configuration option is used to govern how jobs are handled when queued in the middle of a database transaction. When set to true
, this option ensures that jobs that are queued during a database transaction are not actually dispatched until after the database transaction is committed. If the transaction is rolled back, the job will be discarded and not dispatched. If there are no active database transactions, the job will be dispatched immediately.
Queued Event Listeners
At this time, queued event listeners cannot be handled by this package. The implementation for this would require taking over Laravel's event dispatcher, which is not something this package is prepared to do at this time. To work around this, you would need to convert your queued listener to not be queued, but to then dispatch a new queued job that handles the work.
Usage
For the most part, usage of this queue driver is the same as the built in queue drivers. There are, however, a few extra things to consider when working with Amazon's SQS FIFO queues.
Message Groups
In addition to being able to have multiple queue names for each connection, an SQS FIFO queue also allows one to have multiple "message groups" for each FIFO queue. These message groups are used to group related jobs together, and jobs are processed in FIFO order per group. This is important, as your queue performance may depend on being able to assign message groups properly. If you have 100 jobs in the queue, and they all belong to one message group, then only one queue worker will be able to process the jobs at a time. If, however, they can logically be split across 5 message groups, then you could have 5 queue workers processing the jobs from the queues (one per group). The FIFO ordering is per message group.
Currently, by default, all queued jobs will be lumped into one group, as defined in the configuration file. In the configuration provided above, all queued jobs would be sent as part of the default
group. The group can be changed per job using the onMessageGroup()
method, which will be explained more below.
The group id must not be empty, must not be more than 128 characters, and can contain alphanumeric characters and punctuation (!"#$%&'()*+,-./:;<=>?@[\]^_`{|}~
).
Additionally, AWS enforces that the group id must be a string type. While typehinted in docblocks, variable types are not enforced in this package. To prevent AWS errors, the assigned message group will be converted to a string using the strval()
method before being sent to AWS.
In a future release, the message group will be able to be assigned to a function, like the deduplicator below.
Deduplication
When sending jobs to the SQS FIFO queue, Amazon requires a way to determine if the job is a duplicate of a job already in the queue. SQS FIFO queues have a 5 minute deduplication interval, meaning that if a duplicate message is sent within the interval, it will be accepted successfully (no errors), but it will not be delivered or processed.
Determining duplicate messages is generally handled in one of two ways: either all messages are considered unique regardless of content, or messages are considered duplicates if they have the same content.
This package handles deduplication via the deduplicator
configuration option.
To have all messages considered unique, set the deduplicator
to unique
.
To have messages with the same content considered duplicates, there are two options, depending on how the FIFO queue has been configured. If the FIFO queue has been setup in Amazon with the Content-Based Deduplication
feature enabled, then the deduplicator
should be set to sqs
. This tells the connection to rely on Amazon to determine content uniqueness. However, if the Content-Based Deduplication
feature is disabled, the deduplicator
should be set to content
. Note, if Content-Based Deduplication
is disabled, and the deduplicator
is set to sqs
, this will generate an error when attempting to send a job to the queue.
To summarize:
sqs
- This is used when messages with the same content should be considered duplicates andContent-Based Deduplication
is enabled on the SQS FIFO queue.content
- This is used when messages with the same content should be considered duplicates butContent-Based Deduplication
is disabled on the SQS FIFO queue.unique
- This is used when all messages should be considered unique, regardless of content.
If there is a need for a different deduplication algorithm, custom deduplication methods can be registered in the container.
Finally, by default, all queued jobs will use the deduplicator defined in the configuration file. This can be changed per job using the withDeduplicator()
method.
Delayed Jobs
SQS FIFO queues do not support per-message delays, only per-queue delays. The desired delay is defined on the queue itself when the queue is setup in the Amazon Console. Attempting to set a delay on a job sent to a FIFO queue will have no effect. In order to delay a job, you can push()
the job to an SQS FIFO queue that has been defined with a delivery delay.
Since per-message delays are not supported, using the later()
method to push a job to an SQS FIFO queue will throw a BadMethodCallException
exception by default. However, this behavior can be changed using the allow_delay
config option.
Setting the allow_delay
config option to true
for a queue will allow the later()
method to push a job on that queue without an exception. However, the delay parameter sent to the later()
method is completely ignored since the delay time is defined in SQS on the queue itself.
Advanced Usage
Per-Job Group and Deduplicator
If you need to change the group or the deduplicator for a specific job, you will need access to the onMessageGroup()
and withDeduplicator()
methods. These methods are provided through the ShiftOneLabs\LaravelSqsFifoQueue\Bus\SqsFifoQueueable
trait. Once you add this trait to your job class, you can change the group and/or the deduplicator for that specific job without affecting any other jobs on the queue.
Code Example
Job:
<?php namespace App\Jobs; use Illuminate\Bus\Queueable; use Illuminate\Queue\SerializesModels; use Illuminate\Queue\InteractsWithQueue; use Illuminate\Contracts\Queue\ShouldQueue; use ShiftOneLabs\LaravelSqsFifoQueue\Bus\SqsFifoQueueable; class ProcessCoin implements ShouldQueue { use InteractsWithQueue, Queueable, SqsFifoQueueable, SerializesModels; // }
Usage:
dispatch( (new \App\Jobs\ProcessCoin()) ->onMessageGroup('quarter') ->withDeduplicator('unique') );
Notification:
<?php namespace App\Notifications; use Illuminate\Bus\Queueable; use Illuminate\Notifications\Notification; use Illuminate\Contracts\Queue\ShouldQueue; use ShiftOneLabs\LaravelSqsFifoQueue\Bus\SqsFifoQueueable; class InvoicePaid extends Notification implements ShouldQueue { use Queueable, SqsFifoQueueable; // }
Usage:
$user->notify( (new InvoicePaid($invoice))->onMessageGroup($invoice->id) );
Mailable:
<?php namespace App\Mail; use Illuminate\Bus\Queueable; use Illuminate\Mail\Mailable; use Illuminate\Queue\SerializesModels; use Illuminate\Contracts\Queue\ShouldQueue; use ShiftOneLabs\LaravelSqsFifoQueue\Bus\SqsFifoQueueable; class OrderShipped extends Mailable implements ShouldQueue { use Queueable, SerializesModels, SqsFifoQueueable; // }
Usage:
Mail::to($request->user()) ->cc($moreUsers) ->bcc($evenMoreUsers) ->queue(new OrderShipped($order)->onMessageGroup($order->number));
Custom Deduplicator
The deduplicators work by generating a deduplication id that is sent to the queue. If two messages generate the same deduplication id, the second message is considered a duplicate, and the message will not be delivered if it is within the 5 minute deduplication interval.
If you have some custom logic that needs to be used to generate the deduplication id, you can register your own custom deduplicator. The deduplicators are stored in the IoC container with the prefix queue.sqs-fifo.deduplicator
. So, for example, the unique
deduplicator is aliased to queue.sqs-fifo.deduplicator.unique
.
Custom deduplicators are created by registering a new prefixed alias in the IoC. This alias should resolve to a new object instance that implements the ShiftOneLabs\LaravelSqsFifoQueue\Contracts\Queue\Deduplicator
contract. You can either define a new class that implements this contract, or you can create a new ShiftOneLabs\LaravelSqsFifoQueue\Queue\Deduplicators\Callback
instance, which takes a Closure
that performs the deduplication logic. The defined Closure
should take two parameters: $payload
and $queue
, where $payload
is the json_encoded()
message to send to the queue, and $queue
is the name of the queue to which the message is being sent. The generated id must not be more than 128 characters, and can contain alphanumeric characters and punctuation (!"#$%&'()*+,-./:;<=>?@[\]^_`{|}~
). Additionally, AWS requires a string type. To prevent AWS errors, the result will be passed through strval()
before being sent to AWS.
So, for example, if you wanted to create a random
deduplicator that would randomly select some jobs to be duplicates, you could add the following line in the register()
method of your AppServiceProvider
:
$this->app->bind('queue.sqs-fifo.deduplicator.random', function ($app) { return new \ShiftOneLabs\LaravelSqsFifoQueue\Queue\Deduplicators\Callback(function ($payload, $queue) { // Return the deduplication id generated for messages. Randomly 0 or 1. return mt_rand(0,1); }); }
Or, if you prefer to create a new class, your class would look like this:
namespace App\Deduplicators; use ShiftOneLabs\LaravelSqsFifoQueue\Contracts\Queue\Deduplicator; class Random implements Deduplicator { public function generate($payload, $queue) { // Return the deduplication id generated for messages. Randomly 0 or 1. return mt_rand(0,1); } }
And you could register that class in your AppServiceProvider
like this:
$this->app->bind('queue.sqs-fifo.deduplicator.random', App\Deduplicators\Random::class);
With this alias registered, you could update the deduplicator
key in your configuration to use the value random
, or you could set the deduplicator on individual jobs by calling withDeduplicator('random')
on the job.
Contributing
Contributions are welcome. Please see CONTRIBUTING for details.
Security
If you discover any security related issues, please email patrick@shiftonelabs.com instead of using the issue tracker.
Credits
License
The MIT License (MIT). Please see License File for more information.