workerman/redis-queue

Message queue system written in PHP based on workerman and backed by Redis.

v1.2.1 2025-01-02 09:21 UTC

This package is auto-updated.

Last update: 2025-01-02 09:23:03 UTC


README

Message queue system written in PHP based on workerman and backed by Redis.

Install

composer require workerman/redis-queue

Usage

test.php

<?php
require __DIR__ . '/vendor/autoload.php';

use Workerman\Worker;
use Workerman\Timer;
use Workerman\RedisQueue\Client;

$worker = new Worker();
$worker->onWorkerStart = function () {
    $client = new Client('redis://127.0.0.1:6379');

    $client->subscribe('user-1', function($data) {
        echo "user-1\n";
        var_export($data);
    });

    $client->subscribe('user-2', function($data) {
        echo "user-2\n";
        var_export($data);
    });

    $client->onConsumeFailure(function (\Throwable $exception, $package) {
        echo "consume failure\n";
        echo $exception->getMessage(), "\n";
        var_export($package);
    });

    Timer::add(1, function() use ($client) {
        $client->send('user-1', ['some', 'data']);
    });
};

Worker::runAll();

Run with command php test.php start or php test.php start -d.

API

__construct (string $address, [array $options])

Create an instance by $address and $options.

  • $address for example redis://ip:6379.

  • $options is the client connection options. Defaults:

    • auth: default ''
    • db: default 0
    • retry_seconds: Retry interval after consumption failure
    • max_attempts: Maximum number of retries after consumption failure

send(String $queue, Mixed $data, [int $dely=0])

Send a message to a queue

  • $queue is the queue to publish to, String
  • $data is the message to publish, Mixed
  • $dely is delay seconds for delayed consumption, Int

subscribe(mixed $queue, callable $callback)

Subscribe to a queue or queues

  • $queue is a String queue or an Array which has as keys the queue name to subscribe.
  • $callback - function (Mixed $data), $data is the data sent by send($queue, $data).

unsubscribe(mixed $queue)

Unsubscribe from a queue or queues

onConsumeFailure(callable $callback)

When consumption fails onConsumeFailure is triggered.

  • $callback - function (\Throwable $exception, array $package), $package contains information such as data queue attempts