thomasvargiu / amqpal
AMQP Abstraction Layer
Installs: 9 547
Dependents: 0
Suggesters: 0
Security: 0
Stars: 1
Watchers: 1
Forks: 0
Open Issues: 0
Requires
- php: ^5.5 || ^7.0
- php-amqplib/php-amqplib: >=2.6.1,<3.0.0
Requires (Dev)
- ext-amqp: *
- fabpot/php-cs-fixer: ^1.11
- phpmd/phpmd: ^2.4
- phpunit/phpunit: ^4.8 || ^5.2
- richardfullmer/rabbitmq-management-api: ^1.0
- squizlabs/php_codesniffer: ^2.5
Suggests
- ext-amqp: Required for better performance in message consuming
This package is auto-updated.
Last update: 2022-02-01 12:56:40 UTC
README
AMQP Abstraction Layer
An abstraction layer to use different adapters.
Supported adapters:
- php-amqplib (
phpamqplib
in the factory) - php-amqp extension (
amqp
in the factory)
Example
use AMQPAL\Adapter; use AMQPAL\Options; $options = [ 'name' => 'amqp', // or phpamqplib 'options' => [ 'host' => 'localhost', 'username' => 'guest', 'password' => 'guest', 'vhost' => '/' ] ]; $factory = new Adapter\AdapterFactory(); $adapter = $factory->createAdapter($options); $connection = $adapter->getConnection(); $channel = $connection->createChannel(); /* * Creating exchange... */ $exchangeOptions = new Options\ExchangeOptions([ 'name' => 'exchange-name', 'type' => 'direct' ]); $exchange = $channel->createExchange($exchangeOptions); // or: $exchange = $channel->createExchange([ 'name' => 'exchange-name', 'type' => 'direct' ]); /* * Creating queue... */ $queueOptions = new Options\QueueOptions([ 'name' => 'queue-name', ]); $queue = $channel->createQueue($queueOptions); // or: $queue = $channel->createQueue([ 'name' => 'queue-name', ]); $queue->declareQueue(); $queue->bind('exchange-name'); // publishing a message... $exchange->publish('my message in the queue'); // get the next message in the queue... $message = $queue->get(); // or consuming a queue... $callback = function (Adapter\Message $message, Adapter\QueueInterface $queue) { // ack the message... $queue->ack($message->getDeliveryTag()); // return false to stop consuming... return false; }; // set channel qos to fetch just one message at time $channel->setQos(null, 1); // and consuming... $queue->consume($callback); // This is a blocking function