This bundle aims to provide a simple Kafka transport for Symfony Messenger. Kafka REST Proxy support coming soon.
Open a command console, enter your project directory and execute:
$ composer require koco/messenger-kafka
After adding the composer requirement, enable the bundle by adding it to the list of registered bundles
in the config/bundles.php
file of your project:
return [
// ...
Koco\Kafka\KocoKafkaBundle::class => ['all' => true],
];
Specify a DSN starting with either kafka://
or kafka+ssl://
. Multiple brokers are separated by ,
.
kafka://my-local-kafka:9092
kafka+ssl://my-staging-kafka:9093
kafka+ssl://prod-kafka-01:9093,kafka+ssl://prod-kafka-02:9093,kafka+ssl://prod-kafka-03:9093
The configuration options for kafka_conf
and topic_conf
can be found here.
It is highly recommended to set enable.auto.offset.store
to false
for consumers. Otherwise, every message will be acknowledged, regardless of any error thrown by the message handlers.
framework:
messenger:
transports:
producer:
dsn: '%env(KAFKA_URL)%'
# serializer: App\Infrastructure\Messenger\MySerializer
options:
flushTimeout: 10000
flushRetries: 5
topic:
name: 'events'
kafka_conf:
security.protocol: 'sasl_ssl'
ssl.ca.location: '%kernel.project_dir%/config/kafka/ca.pem'
sasl.username: '%env(KAFKA_SASL_USERNAME)%'
sasl.password: '%env(KAFKA_SASL_PASSWORD)%'
sasl.mechanisms: 'SCRAM-SHA-256'
consumer:
dsn: '%env(KAFKA_URL)%'
# serializer: App\Infrastructure\Messenger\MySerializer
options:
commitAsync: true
receiveTimeout: 10000
topic:
name: "events"
kafka_conf:
enable.auto.offset.store: 'false'
group.id: 'my-group-id' # should be unique per consumer
security.protocol: 'sasl_ssl'
ssl.ca.location: '%kernel.project_dir%/config/kafka/ca.pem'
sasl.username: '%env(KAFKA_SASL_USERNAME)%'
sasl.password: '%env(KAFKA_SASL_PASSWORD)%'
sasl.mechanisms: 'SCRAM-SHA-256'
max.poll.interval.ms: '45000'
topic_conf:
auto.offset.reset: 'earliest'
You will most likely want to implement your own Serializer. Please see: https://symfony.com/doc/current/messenger.html#serializing-messages
<?php
namespace App\Infrastructure\Messenger;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
final class MySerializer implements SerializerInterface
{
public function decode(array $encodedEnvelope): Envelope
{
// ...
}
public function encode(Envelope $envelope): array
{
// ...
}
}
Same as with the basic example above, you need to build your own serializer.
Within the decode()
and encode()
you can make use of flix-tech/avro-serde-php.