qxsch / worker-pool
Runs tasks in a parallel processing workerpool.
Fund package maintenance!
qxsch
Installs: 296 384
Dependents: 2
Suggesters: 1
Security: 0
Stars: 104
Watchers: 10
Forks: 22
Open Issues: 2
Requires
- php: >=7.1.0
- ext-pcntl: *
- ext-posix: *
- ext-sockets: *
- ext-sysvsem: *
Requires (Dev)
- phpunit/phpunit: >=4.0.0
Suggests
- php: >=8.0.0
- jeremeamia/SuperClosure: >=1.0.1
This package is not auto-updated.
Last update: 2025-01-13 00:40:15 UTC
README
Parallel Processing WorkerPool for PHP
10K Downloads within 4 Months, thank you very much! We're adding features as anyone requires them.
Examples
The WorkerPool class provides a very simple interface to pass data to a worker pool and have it processed. You can at any time fetch the results from the workers. Each worker child can receive and return any value that can be serialized.
A simple example
<?php $wp=new \QXS\WorkerPool\WorkerPool(); $wp->setWorkerPoolSize(4) ->create(new \QXS\WorkerPool\ClosureWorker( /** * @param mixed $input the input from the WorkerPool::run() Method * @param \QXS\WorkerPool\Semaphore $semaphore the semaphore to synchronize calls accross all workers * @param \ArrayObject $storage a persistent storage for the current child process */ function($input, $semaphore, $storage) { echo "[".getmypid()."]"." hi $input\n"; sleep(rand(1,3)); // this is the working load! return $input; // return null here, in case you do not want to pass any data to the parent } ) ); for($i=0; $i<10; $i++) { $wp->run($i); } $wp->waitForAllWorkers(); // wait for all workers foreach($wp as $val) { echo $val->dump() . "\n"; // dump the returned values // var_dump($val); // dump the returned values }
A more sophisticated example
<?php use QXS\WorkerPool\WorkerPool; use QXS\WorkerPool\WorkerInterface; use QXS\WorkerPool\Semaphore; /** * Our Worker Class */ Class MyWorker implements WorkerInterface { protected $sem; /** * after the worker has been forked into another process * * @param \QXS\WorkerPool\Semaphore $semaphore the semaphore to run synchronized tasks * @throws \Exception in case of a processing Error an Exception will be thrown */ public function onProcessCreate(Semaphore $semaphore) { // semaphore can be used in the run method to synchronize the workers $this->sem=$semaphore; // write something to the stdout echo "\t[".getmypid()."] has been created.\n"; // initialize mt_rand list($usec, $sec) = explode(' ', microtime()); mt_srand((int)( (float) $sec + ((float) $usec * 100000) )); } /** * before the worker process is getting destroyed * * @throws \Exception in case of a processing Error an Exception will be thrown */ public function onProcessDestroy() { // write something to the stdout echo "\t[".getmypid()."] will be destroyed.\n"; } /** * run the work * * @param Serializeable $input the data, that the worker should process * @return Serializeable Returns the result * @throws \Exception in case of a processing Error an Exception will be thrown */ public function run($input) { $input=(string)$input; echo "\t[".getmypid()."] Hi $input\n"; sleep(mt_rand(0,10)); // this is the workload! // and sometimes exceptions might occur if(mt_rand(0,10)==9) { throw new \RuntimeException('We have a problem for '.$input.'.'); } return "Hi $input"; // return null here, in case you do not want to pass any data to the parent } } $wp=new WorkerPool(); $wp->setWorkerPoolSize(10) ->create(new MyWorker()); // produce some tasks for($i=1; $i<=50; $i++) { $wp->run($i); } // some statistics echo "Busy Workers:".$wp->getBusyWorkers()." Free Workers:".$wp->getFreeWorkers()."\n"; // wait for completion of all tasks $wp->waitForAllWorkers(); // collect all the results foreach($wp as $val) { if(isset($val['data'])) { echo "RESULT: ".$val['data']."\n"; } elseif(isset($val['workerException'])) { echo "WORKER EXCEPTION: ".$val['workerException']['class'].": ".$val['workerException']['message']."\n".$val['workerException']['trace']."\n"; } elseif(isset($val['poolException'])) { echo "POOL EXCEPTION: ".$val['poolException']['class'].": ".$val['poolException']['message']."\n".$val['poolException']['trace']."\n"; } } // write something, before the parent exits echo "ByeBye\n";
Synchronize your workers
In case you need to access shared ressources, you can synchronize your workers.
<?php $wp=new \QXS\WorkerPool\WorkerPool(); $wp->setWorkerPoolSize(4) ->create(new \QXS\WorkerPool\ClosureWorker( /** * @param mixed $input the input from the WorkerPool::run() Method * @param \QXS\WorkerPool\Semaphore $semaphore the semaphore to synchronize calls accross all workers * @param \ArrayObject $storage a persistent storge for the current child process */ function($input, $semaphore, $storage) { $semaphore->synchronizedBegin(); try { // this code is being synchronized accross all workers // so here we have just one worker at a time echo "[A][".getmypid()."]"." hi $input\n"; } finally { $semaphore->synchronizedEnd(); } // alternative example $semaphore->synchronize(function() use ($input, $storage) { // this code is being synchronized accross all workers // so here we have just one worker at a time echo "[B][".getmypid()."]"." hi $input\n"; }); sleep(rand(1,3)); // this is the working load! return $input; } ) ); for($i=0; $i<10; $i++) { $wp->run($i); } $wp->waitForAllWorkers(); // wait for all workers foreach($wp as $val) { var_dump($val); // dump the returned values }
Disable semaphore (ability to synchronize workers)
You can disable the semaphore. Some people complained about opening semaphores, that they do not need at all.
<?php $wp=new \QXS\WorkerPool\WorkerPool(); $wp->setWorkerPoolSize(4) ->disableSemaphore() // <--- this disables the semaphore support (you can still use it in the worker, but it will have no effect) ->create(new \QXS\WorkerPool\ClosureWorker( /** * @param mixed $input the input from the WorkerPool::run() Method * @param \QXS\WorkerPool\Semaphore $semaphore the semaphore to synchronize calls accross all workers * @param \ArrayObject $storage a persistent storage for the current child process */ function($input, $semaphore, $storage) { echo "[".getmypid()."]"." hi $input\n"; sleep(rand(1,3)); // this is the working load! return $input; // return null here, in case you do not want to pass any data to the parent } ) ); for($i=0; $i<10; $i++) { $wp->run($i); } $wp->waitForAllWorkers(); // wait for all workers foreach($wp as $val) { var_dump($val); // dump the returned values }
Automatic respawn
You can choose to automatically respawn dead workers.
<?php $wp=new \QXS\WorkerPool\WorkerPool(); $wp->setWorkerPoolSize(4) ->respawnAutomatically() ->create(new \QXS\WorkerPool\ClosureWorker( /** * @param mixed $input the input from the WorkerPool::run() Method * @param \QXS\WorkerPool\Semaphore $semaphore the semaphore to synchronize calls accross all workers * @param \ArrayObject $storage a persistent storage for the current child process */ function($input, $semaphore, $storage) { echo "[".getmypid()."]"." hi $input\n"; sleep(rand(1,3)); // this is the working load! // Simulate unexpected worker death if (rand(1, 10) > 5) exit; return $input; // return null here, in case you do not want to pass any data to the parent } ) ); for($i=0; $i<10; $i++) { $wp->run($i); } $wp->waitForAllWorkers(); // wait for all workers foreach($wp as $val) { var_dump($val); // dump the returned values }
Each time a worker dies, a new one will be created with an incremented index.
You should avoid the situation where a worker dies but the respawn capability can be a useful workaround until you fix the situation.
Transparent output to ps
See what's happening when running a PS:
root 2378 \_ simpleExample.php: Parent
root 2379 \_ simpleExample.php: Worker 1 of QXS\WorkerPool\ClosureWorker [busy]
root 2380 \_ simpleExample.php: Worker 2 of QXS\WorkerPool\ClosureWorker [busy]
root 2381 \_ simpleExample.php: Worker 3 of QXS\WorkerPool\ClosureWorker [free]
root 2382 \_ simpleExample.php: Worker 4 of QXS\WorkerPool\ClosureWorker [free]
Documentation
The documentation can be found here http://qxsch.github.io/WorkerPool/doc/