mcfedr / queue-manager-bundle
A bundle for managing job queues
Installs: 89 755
Dependents: 7
Suggesters: 0
Security: 0
Stars: 9
Watchers: 3
Forks: 11
Open Issues: 5
Type:symfony-bundle
Requires
- php: >=8.0
- ext-json: *
- nesbot/carbon: ^1|^2|^3
- ramsey/uuid: ^3.7|^4.1
- symfony/framework-bundle: ^5.0|^6.0|^7.0
Requires (Dev)
- aws/aws-sdk-php: ^3.15
- doctrine/doctrine-bundle: ^1.6|^2.0
- doctrine/orm: ^2.5
- friendsofphp/php-cs-fixer: ^3.0
- google/cloud-pubsub: ^1.13
- pda/pheanstalk: ^3.1|^v4
- phpstan/phpstan: ^0.12|^1.0
- phpstan/phpstan-doctrine: ^0.12|^1.0
- phpstan/phpstan-phpunit: ^0.12|^1.0
- phpstan/phpstan-symfony: ^0.12|^1.0
- phpunit/phpunit: ^9
- symfony/browser-kit: ^5.0|^6.0|^7.0
- symfony/dotenv: ^5.0|^6.0|^7.0
- symfony/monolog-bundle: ^3.0|^4.0
- symfony/phpunit-bridge: *
- symfony/proxy-manager-bridge: ^5.0|^6.0|^7.0
- symfony/yaml: ^5.0|^6.0|^7.0
Suggests
- ext-pcntl: For cleaner job handling in runner
- aws/aws-sdk-php: Required to use SQS driver
- doctrine/doctrine-bundle: Required to use doctrine driver
- google/cloud-pubsub: Required to use Pub/Sub driver
- pda/pheanstalk: Required to use beanstalkd driver
- symfony/proxy-manager-bridge: Required to use doctrine driver
- dev-master
- 7.3.0
- 7.2.0
- 7.1.1
- 7.1.0
- 7.0.0
- 6.8.1
- 6.8.0
- 6.7.0
- 6.6.1
- 6.6.0
- 6.5.0
- 6.4.3
- 6.4.2
- 6.4.1
- 6.4.0
- 6.3.0
- 6.2.0
- 6.1.2
- 6.1.1
- 6.1.0
- 6.0.3
- 6.0.2
- 6.0.1
- 6.0.0
- 5.10.0
- 5.9.0
- 5.8.3
- 5.8.2
- 5.8.1
- 5.8.0
- 5.7.2
- 5.7.1
- 5.7.0
- 5.6.1
- 5.6.0
- 5.5.0
- 5.4.5
- 5.4.4
- 5.4.3
- 5.4.2
- 5.4.1
- 5.4.0
- 5.3.1
- 5.3.0
- 5.2.1
- 5.2.0
- 5.1.0
- 5.0.4
- 5.0.3
- 5.0.2
- 5.0.1
- 5.0.0
- 4.1.0
- 4.0.3
- 4.0.2
- 4.0.1
- 4.0.0
- 3.3.3
- 3.3.2
- 3.3.1
- 3.3.0
- 3.2.0
- 3.1.1
- 3.1.0
- 3.0.2
- 3.0.1
- 3.0.0
- 2.3.0
- 2.2.0
- 2.1.0
- 2.0.1
- 2.0.0
- 1.2.1
- 1.2.0
- 1.1.1
- 1.1.0
- 1.0.2
- 1.0.1
- 1.0.0
- dev-dependabot/composer/guzzlehttp/psr7-2.4.5
- dev-dependabot/composer/guzzlehttp/guzzle-7.5.0
- dev-dependabot/composer/symfony/http-kernel-6.0.20
- dev-Zizitto-master
- dev-delay-no-orm
This package is auto-updated.
Last update: 2024-10-25 14:57:20 UTC
README
A bundle for running background jobs in Symfony.
This bundle provides a consistent queue interface, with plugable 'drivers' that can schedule jobs using a number of different queue types:
There are also a number of 'helper' plugins:
-
This plugins can schedule jobs far in advance and move them into a real time queue when they should be run. Use in combination with SQS or Beanstalkd which don't support scheduling jobs.
-
Perioidic Jobs
Automatically schedule a jobs to run every hour/day/week or other period. Randomizes the actual time to keep an even server load.
Overview
A job is a Symfony service that implements the Worker
interface. This has a single method execute(array $arguments)
.
The name of the job is the service name.
You add jobs to the queue by calling $container->get(QueueManagerRegistry::class)->put($name, $arguments)
.
Check the documentation of the driver you are using as to how to run the daemon process(es).
Install
Composer
composer require mcfedr/queue-manager-bundle
AppKernel
Include the bundle in your AppKernel
public function registerBundles() { $bundles = [ ... new Mcfedr\QueueManagerBundle\McfedrQueueManagerBundle(),
Config
You must configure one (or more) drivers to use. Generally you will have just one and call it 'default'.
Beanstalk
Usage
The beanstalk runner is a Symfony command. You can runner multiple instances if you need to handle higher numbers of jobs.
./bin/console mcfedr:queue:{name}-runner
Where {name}
is what you used in the config. Add -v
or more to get detailed logs.
Config
mcfedr_queue_manager: managers: default: driver: beanstalkd options: host: 127.0.0.1 port: 11300 default_queue: mcfedr_queue
Supported options to QueueManager::put
queue
- The name of the queue to put the job in.priority
- The job priority.ttr
- Time to run, the time given for a job to finish before it is repeated.time
- A\DateTime
object of when to schedule this job.delay
- Number of seconds from now to schedule this job.
AWS SQS
Usage
The sqs runner is a Symfony command. You can runner multiple instances if you need to handle higher numbers of jobs.
./bin/console mcfedr:queue:{name}-runner
Where {name}
is what you used in the config. Add -v
or more to get detailed logs.
Config
mcfedr_queue_manager: managers: default: driver: sqs options: default_url: https://sqs.eu-west-1.amazonaws.com/... region: eu-west-1 credentials: key: 'my-access-key-id' secret: 'my-secret-access-key' queues: name: https://sqs.eu-west-1.amazonaws.com/... name2: https://sqs.eu-west-1.amazonaws.com/...
default_url
- Default SQS queue url.region
- The region where your queue is. Required if not passingsqs_client
.credentials
optional - Specify your key and secret This is optional because the SDK can pick up your credentials from a variety of places.sqs_client
- Name ofSQSClient
service to use.queues
optional - Allows you to setup a mapping of short names for queues, this makes it easier to use multiple queues and keep the config in one place.
Supported options to QueueManager::put
url
- Astring
with the url of a queue.queue
- Astring
with the name of a queue in the config.time
- A\DateTime
object of when to schedule this job. Note: SQS can delay jobs up to 15 minutes.delay
- Number of seconds from now to schedule this job. Note: SQS can delay jobs up to 15 minutes.ttr
- Number of seconds during which Amazon SQS prevents other consumers from receiving and processing the message (SQS Visibility Timeout).
GCP Pub/Sub
Usage
The pub/sub runner is a Symfony command. You can runner multiple instances if you need to handle higher numbers of jobs.
./bin/console mcfedr:queue:{name}-runner
Where {name}
is what you used in the config. Add -v
or more to get detailed logs.
Config
mcfedr_queue_manager: managers: default: driver: pub_sub options: default_subscription: 'test_sub' default_topic: 'projects/project/topics/test-topic' pub_sub_queues: name1: topic: 'projects/project/topics/test-topic' subscription: 'test_sub'
default_subscription
- Default Pub/Sub subscription to listen to.default_topic
- Default Pub/Sub topic to push to.key_file_path
optional - Specify your key file. This is optional because the SDK can pick up your credentials from a variety of places.pub_sub_client
- Name ofPubSubClient
service to use.pub_sub_queues
optional - Allows you to setup a mapping of short names for queues, this makes it easier to use multiple queues and keep the config in one place. Each queue should have atopic
andsubscription
.
Supported options to QueueManager::put
topic
- Astring
with the url of a queue.queue
- Astring
with the name of a queue in the config.
Periodic
This driver doesn't run jobs, it requires another driver to actually process jobs.
Usage
There is no runner daemon for this driver as it just plugs into other drivers. Use it by
put
ting jobs into this driver with the period
option.
Config
mcfedr_queue_manager: managers: periodic: driver: periodic options: default_manager: delay default_manager_options: []
This will create a QueueManager
service named "mcfedr_queue_manager.periodic"
.
default_manager
- Default job processor, must support delayed jobs, for example Doctrine Delay.default_manager_options
- Default options to pass to job processorput
.
Supported options to QueueManager::put
period
- The average number of seconds between job runs.manager
- Use a different job processor for this job.manager_options
- Options to pass to the processorsput
method.
Doctrine Delay
This driver doesn't run jobs, it requires another driver to actually process jobs.
It currently only works with MySQL as a native query is required to find jobs in a concurrency safe way.
Usage
You should run the daemon for delay in addition to any other daemons you are using. This runner simply moves jobs from Doctrine into your other job queues. Because its not doing much work generally a single instance can cope with a high number of jobs.
./bin/console mcfedr:queue:{name}-runner
Where {name}
is what you used in the config. Add -v
or more to get detailed logs.
Config
mcfedr_queue_manager: managers: delay: driver: doctrine_delay options: entity_manager: default default_manager: default default_manager_options: []
This will create a QueueManager
service named "mcfedr_queue_manager.delay"
.
entity_manager
- Doctrine entity manager to use.default_manager
- Default job processor.default_manager_options
- Default options to pass to job processorput
.
Supported options to QueueManager::put
time
- A\DateTime
object of when to schedule this job.delay
- Number of seconds from now to schedule this job.force_delay
- A boolean that forces the job to be delayed by the specified number of seconds.manager
- Use a different job processor for this job.manager_options
- Options to pass to the processorsput
method.
Note
If delay
or time
option is less then 30 seconds the job will be scheduled for immediate execution unless the force_delay
option is set to true
Tables
After you have installed you will need to do a schema update so that the table of delayed tasks is created.
Additional options
These are the defaults for a number of other options.
mcfedr_queue_manager: retry_limit: 3 sleep_seconds: 5 report_memory: false doctrine_reset: true
Doctrine
To avoid memory leaks entity manager is being reset after job execution.
Resetting a non-lazy manager service is deprecated since Symfony 3.2 and will throw an exception in version 4.0. So if you use Symfony 3.2 or greater you need to install symfony/proxy-manager-bridge to support Lazy Services.
composer require proxy-manager-bridge
Usage
You can access the QueueManagerRegistry
for simple access to your queue.
Just inject QueueManagerRegistry::class
and call put
to add new jobs to the queue.
Also, each manager will be a service you can access with the name "mcfedr_queue_manager.$name"
.
It implements the QueueManager
interface, where you can call just 2 simple methods.
/**
* Put a new job on a queue
*
* @param string $name The service name of the worker that implements {@link \Mcfedr\QueueManagerBundle\Queue\Worker}
* @param array $arguments Arguments to pass to execute - must be json serializable
* @param array $options Options for creating the job - these depend on the driver used
*/
public function put(string $name, array $arguments = [], array $options = []): Job
/**
* Remove a job, you should call this to cancel a job
*
* @param $job
* @throws WrongJobException
* @throws NoSuchJobException
*/
public function delete(Job $job): void;
Jobs
Jobs to run are Symfony services that implement Mcfedr\QueueManagerBundle\Queue\Worker
There is one method, that is called with the arguments you passed to QueueManager::put
.
/**
* Called to start the queued task
*
* @param array $arguments
* @throws \Exception
*/
public function execute(array $arguments): void;
If your job throws an exception it will be retried (assuming the driver supports retrying),
unless the exception thrown is an instance of UnrecoverableJobExceptionInterface
.
Workers should be tagged with mcfedr_queue_manager.worker
, if you are using autowiring this will
happen automatically.
By default the job name is the class, but you can also add tags with specific ids, e.g.
Worker: tags: - { name: 'mcfedr_queue_manager.worker', id: 'test_worker' }
Now you can schedule this job with both:
$queueManager->put(Worker::class, ...) $queueManager->put('test_worker', ...)
Events
A number of events are triggered during the running of jobs.
Creating your own driver
Firstly a driver needs to implement a QueueManager
. This should put tasks into queues.
The options argument can be used to accept any extra parameters specific to your implementation.
For example, this might include a delay
or a priority
if you support that.
You also need to create a Job
class, many drivers can just extend AbstractJob
but you can add any extra data you need.
Creating a runner
Many drivers can use the RunnerCommand
as a base, implementing the getJob
method.
Other queue servers have their own runners, in which case you need to write the code such that the correct worker is called.
The service mcfedr_queue_manager.job_executor
can help with this.