chh / kue
A simple interface to multiple job queue implemenations
Installs: 4 391
Dependents: 1
Suggesters: 0
Security: 0
Stars: 18
Watchers: 5
Forks: 2
Open Issues: 3
Requires
- php: >=5.3.3
- evenement/evenement: ~1.0
Requires (Dev)
- aws/aws-sdk-php: ~2.1.1
- chh/bob: ~1.0@dev
- monolog/monolog: ~1.2
- mtdowling/cron-expression: ~1.0
- phpunit/phpunit: ~3.7
- symfony/console: ~2.1
Suggests
- ext-redis: Enable RedisQueue
- aws/aws-sdk-php: AmazonSqsQueue
- monolog/monolog: Used by the console command
- mtdowling/cron-expression: Enable CRON expressions
- symfony/console: Used by the console command
This package is not auto-updated.
Last update: 2025-01-18 14:21:58 UTC
README
A minimalistic, generic and framework independent interface to job queues
Install
Install via Composer:
% wget http://getcomposer.org/composer.phar
% php composer.phar require chh/kue:*@dev
Use
Jobs
Jobs are classes which implement the Kue\Job
interface. This interface
specifies one method — run()
. The run
method is invoked by the
worker script, which pulls jobs out of the queue.
Why not just implement
callable
?
Because if the contract is that a job must be callable, then every callback (even a Closure) can satisfy it. The problem with this approach is, that workers usually run in an other process than the script that puts the jobs into the queue. Closures and functions can't be serialized, so there's no way to transport them to the worker process. Objects can be serialized in one process and unserialized in another, and retain all their state and context.
Queues
A queue implements the Kue\Queue
interface. This interface looks as
follows:
interface Kue\Queue { function push(Kue\Job $job); function pop(); function flush(); function process(Kue\Worker $worker); }
The queue's responsibility is, to be the transport layer between the user
who pushes jobs into the queue, and the worker script which pulls them
out by polling pop()
for jobs to process.
The flush()
method should be called by the client after jobs have been
pushed, and can be used to send multiple jobs with a more efficient
transport mechanism — like a batch request.
Kue ships with a Kue\LocalQueue
for development environments. This
queue sends jobs to a local network socket on push()
, and receives on
this network socket when pop()
is called.
For production it's recommended to use something like SQS behind the Queue interface.
Workers
Workers take the queue, and start polling with the pop()
method for
jobs.
interface Kue\Worker extends \Evenement\EventEmitterInterface { function process(Kue\Queue $queue); }
Workers should abstract the strategy for processing the queued jobs. The worker implementations which are shipped with Kue should be fully sufficient for most use cases.
Kue ships with these workers out of the box:
Kue\SequentialWorker
— a simple worker which calls the queue'spop()
method in afor
loop, and processes jobs strictly one after the other. This is simpler and works on any platform, but should be only used for development or on Windows.Kue\PreforkingWorker
— an advanced worker, which starts a master process and forks a pool of worker processes, which all callpop()
individually. It can run jobs concurrently and is more fault tolerant, because when a worker dies, the master simply starts up a new one. You will want to use this worker in production. It only works on *nix platforms, like OSX and Linux.
All workers are Evenement Event Emitters and emit following standard events:
init
: Emitted before the job gets run, gets called with the job as argument.success
: A job was processed successfully. The handler receives the job as argument.exception
: An exception occured while running the job. The job and exception are passed as arguments.
The Kue\PreforkingWorker
supports following additional events:
beforeFork
: This is emitted before the worker process gets forked from the parent.afterFork
: Emitted from the child process after it was forked from the master. Use this to reinitialize resources in the worker process.
These workers are typically managed by the Symfony Console Command, which ships with Kue:
use Symfony\Component\Console\Application; use Kue\Command\WorkCommand; use Kue\LocalQueue; $worker = new WorkCommand(new LocalQueue); $app = new Application; $app->add($worker); $app->run();
The queue worker can the be run with the kue:work
command inside the
Symfony Console Application. The worker implementations can be switched
with the -c
flag:
1
: TheKue\SequentialWorker
is used.>1
: TheKue\PreforkingWorker
is used, with a pool ofc
workers.
To attach your own event listeners to the automatically selected worker instance, you can pass an array of event names plus handlers as second argument to the command's constructor:
$webApp = new MyApplication; $worker = new WorkerCommand(new LocalQueue, array( 'init' => function($job) use ($webApp) { $job->application = $webApp; } ));