symfony-bundles / kafka-bundle
Symfony Kafka Bundle
Installs: 15 158
Dependents: 0
Suggesters: 0
Security: 0
Stars: 0
Watchers: 7
Forks: 2
Open Issues: 1
Type:symfony-bundle
Requires
- php: ^7.4
- ext-json: *
- ext-mbstring: *
- ext-pcntl: *
- ext-rdkafka: >=4.0
- psr/log: ^1.1
- symfony/console: ^5.0
- symfony/framework-bundle: ^5.0
- symfony/yaml: ^5.0
Requires (Dev)
- kwn/php-rdkafka-stubs: ^2.0
- phpstan/phpstan: ^0.12
- phpunit/php-code-coverage: ^9.2
- phpunit/phpunit: ^9.5
This package is auto-updated.
Last update: 2025-01-16 23:17:28 UTC
README
How to use
- Install package
composer req mgid/kafka-bundle
- Create consumer. Example (src/Consumer/EmailSendConsumer.php):
<?php namespace App\Consumer; use Swift_Mailer; use Mgid\KafkaBundle\Command\Consumer; class EmailSendConsumer extends Consumer { public const QUEUE_NAME = 'email_send_queue'; /** * @var Swift_Mailer */ private $mailer; /** * @required * * @param Swift_Mailer $mailer */ public function setMailer(Swift_Mailer $mailer) { $this->mailer = $mailer; } /** * {@inheritdoc} */ protected function onMessage(array $data): void { $message = (new \Swift_Message($data['subject'])) ->setFrom($data['sender']) ->setTo($data['recipient']) ->setBody($data['body']); $this->mailer->send($message); } }
- Produce message. Example (src/Service/EmailService.php):
<?php namespace App\Service; use App\Consumer\DemoConsumer; use Mgid\KafkaBundle\DependencyInjection\Traits\ProducerTrait; class EmailService { use ProducerTrait; /** * @param array $data */ public function send(array $data): void { $this->producer->send(DemoConsumer::QUEUE_NAME, $data); } }
- Run consumer. Example:
php bin/console app:consumer:email-send
Default configuration
# config/packages/mgid_kafka.yaml mgid_kafka: producers: configuration: group.id: 'main_group' log.connection.close: 'false' metadata.broker.list: '%env(KAFKA_BROKERS)%' queue.buffering.max.messages: 100000 consumers: configuration: group.id: 'main_group' auto.offset.reset: 'smallest' log.connection.close: 'false' metadata.broker.list: '%env(KAFKA_BROKERS)%'
Read more about supported configuration properties: librdkafka configuration.