arus/amqp-bridge

Bridge to AMQP extension for PHP 7.1+ (incl. PHP 8) with support for annotations and JSON Schema

v1.0.2 2021-09-07 11:49 UTC

This package is auto-updated.

Last update: 2025-01-07 19:13:20 UTC


README

Build Status Code Coverage Scrutinizer Code Quality Total Downloads Latest Stable Version License

Installation

composer require 'arus/amqp-bridge'

QuickStart

Queue Message Handler

declare(strict_types=1);

namespace App\QueueMessageHandler;

use Arus\AMQP\Bridge\PayloadDecoder\JsonDecoder;
use Arus\AMQP\Bridge\MessageHandlerInterface;
use Arus\AMQP\Bridge\MessageInterface;

use const JSON_OBJECT_AS_ARRAY;

/**
 * @JsonSchemaReference("config/json-schemas/SomeQueueMessage.json")
 */
final class SomeQueueMessageHandler implements MessageHandlerInterface
{

    /**
     * {@inheritDoc}
     */
    public function handle(MessageInterface $message) : void
    {
        $data = (new JsonDecoder)->decode($message, JSON_OBJECT_AS_ARRAY);

        // some code...
    }
}

Message Queue Consumer

use App\QueueMessageHandler\SomeQueueMessageHandler;
use Arus\AMQP\Bridge\Consumer;

$connection = new AMQPConnection();
$connection->setHost('localhost');
$connection->setPort(5672);
$connection->setVhost('/');
$connection->setLogin('guest');
$connection->setPassword('guest');
$connection->connect();

$channel = new AMQPChannel($connection);
$channel->setPrefetchCount(100);

$queue = new AMQPQueue($channel);
$queue->setName('queue.name');

// init the message queue consumer...
$consumer = new Consumer(new SomeQueueMessageHandler());
// [optional] set a logger based on PSR-3...
$consumer->setLogger($logger);
// [optional] set a custom payload validator...
$consumer->setPayloadValidator($payloadValidator);
// [optional] set a custom annotation reader...
$consumer->setAnnotationReader($annotationReader);
// [optional] use a JSON schema validator for queue messages...
$consumer->useJsonSchemaValidator();
// [optional] set a callback that will be called when a queue message is received...
$consumer->setMessageReceivedCallback(function ($message) {
    // here you can, for example, re-open doctrine entity managers...
});
// [optional] set a callback that will be called when a queue message is handled...
$consumer->setMessageHandledCallback(function ($message) {
    // here you can, for example, clear doctrine entity managers...
});

try {
    $queue->consume($consumer);
} catch (Throwable $e) {
    $connection->disconnect();

    throw $e;
}

Acknowledge, reject and requeue commands

  • If a queue message was handled without errors, such a message will be automatically acknowledged;
  • If a queue message contains undecodable or invalid payload, such a message will be automatically rejected;
  • If a queue message was handled with an unexpected error, such a message will be automatically requeued;
  • If you need to reject a queue message in code, just throw an exception Arus\AMQP\Bridge\Exception\UnacknowledgableQueueMessageExceptionInterface.

Test run

composer test

Useful links