sylius-labs / rabbitmq-simplebus-bundle
Integrates RabbitMQ with SimpleBus.
Installs: 50 426
Dependents: 1
Suggesters: 0
Security: 0
Stars: 11
Watchers: 5
Forks: 4
Open Issues: 1
Type:symfony-bundle
Requires
- php: ^7.1
- php-amqplib/rabbitmq-bundle: ^1.12
- simple-bus/message-bus: ^3.0
- simple-bus/symfony-bridge: ^5.1
- symfony/monolog-bundle: ^3.1
Requires (Dev)
This package is auto-updated.
Last update: 2025-01-06 10:10:34 UTC
README
Transforms AMQP messages received from RabbitMQ to event handled by SimpleBus.
Installation
- Require this package:
$ composer require sylius-labs/rabbitmq-simplebus-bundle
- Add bundle to
AppKernel.php
:
public function registerBundles() { $bundles = [ new \SyliusLabs\RabbitMqSimpleBusBundle\RabbitMqSimpleBusBundle(), ]; return array_merge(parent::registerBundles(), $bundles); }
Usage
- Create your custom AMQP messages denormalizer:
// src/Acme/CustomDenormalizer.php namespace Acme; use PhpAmqpLib\Message\AMQPMessage; use SyliusLabs\RabbitMqSimpleBusBundle\Denormalizer\DenormalizationFailedException; use SyliusLabs\RabbitMqSimpleBusBundle\Denormalizer\DenormalizerInterface; class CustomDenormalizer implements DenormalizerInterface { public function supports(AMQPMessage $message) { return null !== json_decode($message->getBody(), true); } public function denormalize(AMQPMessage $message) { if (!$this->supports($message)) { throw new DenormalizationFailedException('Unsupported message!'); } return new CustomEvent(json_decode($message->getBody(), true)); } }
- Tag your denormalizer service with
rabbitmq_simplebus.amqp_denormalizer
:
<!-- app/config/services.xml --> <?xml version="1.0" encoding="UTF-8"?> <container xmlns="http://symfony.com/schema/dic/services" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://symfony.com/schema/dic/services http://symfony.com/schema/dic/services/services-1.0.xsd"> <services> <service id="acme.custom_denormalizer" class="Acme\CustomDenormalizer"> <tag name="rabbitmq_simplebus.amqp_denormalizer" /> </service> </services> </container>
# app/config/services.yml services: acme.custom_denormalizer: class: Acme\CustomDenormalizer tags: - { name: rabbitmq_simplebus.amqp_denormalizer }
# app/config/config.yml old_sound_rabbit_mq: connections: default: host: 'localhost' port: 5672 user: 'guest' password: 'guest' consumers: rabbitmq_simplebus: connection: default exchange_options: { name: 'rabbitmq-simplebus', type: direct } queue_options: { name: 'rabbitmq-simplebus' } callback: rabbitmq_simplebus.consumer
- Run RabbitMQ consumer:
$ bin/console rabbitmq:consumer rabbitmq_simplebus