corley / queue-mysql
0.0.2
2017-01-27 07:40 UTC
Requires
- ext-pdo: *
Requires (Dev)
- corley/queue: ~1
- phpunit/phpunit: ~4.0
This package is not auto-updated.
Last update: 2025-01-18 22:35:49 UTC
README
To use this package you need the pdo_mysql
extension
$adapter = new Corley\Queue\MySQL\MySQL($pdo);
Features:
delay
- A message is visible after a delaytimeout
- A message is invisible for a timeout after a receiveretention
- A message is removed after a given amount of time
In the base queue you can set those parameters as defaults but you can override them using options
$queue->send("message", [ "delay" => 20 ]); // visible after 20 seconds list($rpt, $msg) = $queue->receive(["timeout" => 900]); // invisible for 900 seconds
Database schema
CREATE TABLE queues ( id INT(11) NOT NULL AUTO_INCREMENT, name VARCHAR(255) NOT NULL, default_delay INT(11) NOT NULL DEFAULT 0, default_timeout INT(11) NOT NULL, retention INT(11) NOT NULL, PRIMARY KEY(id) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8; ALTER TABLE `queues` ADD UNIQUE KEY `queues_unique_name` (`name`); CREATE TABLE messages ( id INT(11) NOT NULL AUTO_INCREMENT, queue_id INT(11) NOT NULL, body LONGBLOB NOT NULL, consumer VARCHAR(255) DEFAULT NULL, timeout INT(11) DEFAULT NULL, read_on INT(11) DEFAULT NULL, inserted_on INT(11) NOT NULL, read_times INT(11) NOT NULL DEFAULT 0, PRIMARY KEY(id), KEY `queue_idx` (`queue_id`) USING BTREE ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8; ALTER TABLE `messages` ADD UNIQUE KEY `consumer_unique_name` (`consumer`); ALTER TABLE messages ADD CONSTRAINT fk_queue_id FOREIGN KEY (queue_id) REFERENCES queues(id) ON DELETE CASCADE ON UPDATE CASCADE;
Create a new queue
INSERT INTO `queues` (name, default_timeout, default_delay, retention) VALUES ('example', 900, 20, 5760);
Example
You should use corley/queue:~1
$pdo = new PDO("mysql:dbname=test;host=127.0.0.1", "root", "root"); $adapter = new Corley\Queue\MySQL\MySQL($pdo); $queue = new Corley\Queue\Queue("example", $adapter); $queue->send("Hello"); list($receipt, $message) = $queue->receive(); // [1, "Hello"] $queue->delete($receipt);