diff --git a/config/kafka.php b/config/kafka.php index 4b7ce0cf..fbb326f5 100644 --- a/config/kafka.php +++ b/config/kafka.php @@ -6,6 +6,20 @@ */ 'brokers' => env('KAFKA_BROKERS', 'localhost:9092'), + /* + | Default security protocol + */ + 'securityProtocol' => env('KAFKA_SECURITY_PROTOCOL', 'PLAINTEXT'), + + /* + | Default sasl configuration + */ + 'sasl' => [ + 'mechanisms' => env('KAFKA_MECHANISMS', 'PLAINTEXT'), + 'username' => env('KAFKA_USERNAME', null), + 'password' => env('KAFKA_PASSWORD', null) + ], + /* | Kafka consumers belonging to the same consumer group share a group id. | The consumers in a group then divides the topic partitions as fairly amongst themselves as possible by diff --git a/src/Console/Commands/KafkaConsumer/Options.php b/src/Console/Commands/KafkaConsumer/Options.php index a1e60e5f..5d11a506 100644 --- a/src/Console/Commands/KafkaConsumer/Options.php +++ b/src/Console/Commands/KafkaConsumer/Options.php @@ -14,7 +14,7 @@ class Options private ?int $commit = 1; private ?string $dlq = null; private int $maxMessages = -1; - private ?string $securityProtocol = 'plaintext'; + private ?string $securityProtocol = null; private ?string $saslUsername; private ?string $saslPassword; private ?string $saslMechanisms; @@ -81,13 +81,17 @@ public function getSasl(): ?Sasl username: $this->saslUsername, password: $this->saslPassword, mechanisms: $this->saslMechanisms, - securityProtocol: $this->securityProtocol + securityProtocol: $this->getSecurityProtocol() ); } public function getSecurityProtocol(): ?string { - return $this->securityProtocol; + $securityProtocol = strlen($this->securityProtocol) > 1 + ? $this->securityProtocol + : $this->config['securityProtocol']; + + return $securityProtocol ?? 'plaintext'; } public function getBroker() diff --git a/src/Console/Commands/KafkaConsumerCommand.php b/src/Console/Commands/KafkaConsumerCommand.php index bddbc639..e138454b 100644 --- a/src/Console/Commands/KafkaConsumerCommand.php +++ b/src/Console/Commands/KafkaConsumerCommand.php @@ -53,7 +53,10 @@ public function handle() return; } - $options = new Options($this->options(), $this->config); + + $parsedOptions = array_map(fn ($value) => $value === '?' ? null : $value, $this->options()); + + $options = new Options($parsedOptions, $this->config); $consumer = $options->getConsumer(); $deserializer = $options->getDeserializer();