arus / amqp-bridge
Bridge to AMQP extension for PHP 7.1+ (incl. PHP 8) with support for annotations and JSON Schema
v1.0.2
2021-09-07 11:49 UTC
Requires
- php: ^7.1|^8.0
- ext-amqp: *
- doctrine/annotations: ^1.6
- justinrainbow/json-schema: ^5.0
- psr/log: ^1.0
Requires (Dev)
- phpunit/phpunit: 7.5.20|9.5.0
- sunrise/coding-standard: 1.0.0
This package is auto-updated.
Last update: 2025-01-07 19:13:20 UTC
README
Installation
composer require 'arus/amqp-bridge'
QuickStart
Queue Message Handler
declare(strict_types=1); namespace App\QueueMessageHandler; use Arus\AMQP\Bridge\PayloadDecoder\JsonDecoder; use Arus\AMQP\Bridge\MessageHandlerInterface; use Arus\AMQP\Bridge\MessageInterface; use const JSON_OBJECT_AS_ARRAY; /** * @JsonSchemaReference("config/json-schemas/SomeQueueMessage.json") */ final class SomeQueueMessageHandler implements MessageHandlerInterface { /** * {@inheritDoc} */ public function handle(MessageInterface $message) : void { $data = (new JsonDecoder)->decode($message, JSON_OBJECT_AS_ARRAY); // some code... } }
Message Queue Consumer
use App\QueueMessageHandler\SomeQueueMessageHandler; use Arus\AMQP\Bridge\Consumer; $connection = new AMQPConnection(); $connection->setHost('localhost'); $connection->setPort(5672); $connection->setVhost('/'); $connection->setLogin('guest'); $connection->setPassword('guest'); $connection->connect(); $channel = new AMQPChannel($connection); $channel->setPrefetchCount(100); $queue = new AMQPQueue($channel); $queue->setName('queue.name'); // init the message queue consumer... $consumer = new Consumer(new SomeQueueMessageHandler()); // [optional] set a logger based on PSR-3... $consumer->setLogger($logger); // [optional] set a custom payload validator... $consumer->setPayloadValidator($payloadValidator); // [optional] set a custom annotation reader... $consumer->setAnnotationReader($annotationReader); // [optional] use a JSON schema validator for queue messages... $consumer->useJsonSchemaValidator(); // [optional] set a callback that will be called when a queue message is received... $consumer->setMessageReceivedCallback(function ($message) { // here you can, for example, re-open doctrine entity managers... }); // [optional] set a callback that will be called when a queue message is handled... $consumer->setMessageHandledCallback(function ($message) { // here you can, for example, clear doctrine entity managers... }); try { $queue->consume($consumer); } catch (Throwable $e) { $connection->disconnect(); throw $e; }
Acknowledge, reject and requeue commands
- If a queue message was handled without errors, such a message will be automatically acknowledged;
- If a queue message contains undecodable or invalid payload, such a message will be automatically rejected;
- If a queue message was handled with an unexpected error, such a message will be automatically requeued;
- If you need to reject a queue message in code, just throw an exception
Arus\AMQP\Bridge\Exception\UnacknowledgableQueueMessageExceptionInterface
.
Test run
composer test