Skip to content

Commit

Permalink
Fix partition assignment
Browse files Browse the repository at this point in the history
  • Loading branch information
mateusjunges committed Jan 9, 2024
1 parent 03271f2 commit a35fa40
Showing 1 changed file with 8 additions and 4 deletions.
12 changes: 8 additions & 4 deletions src/Consumers/Consumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,18 @@ public function consume(): void
'conf' => $this->setConf($this->config->getProducerOptions()),
]);

$this->committer = $this->committerFactory->make($this->consumer, $this->config);

// Calling `subscribe` overrides the assigned topic partitions, so we
// should check if there are any assignment defined before calling
// the subscribe method on the consumer. Partition assignment
// have precedence over topic subscriptions.
if ($this->config->shouldAssignTopicPartitions()) {
$this->consumer->assign($this->config->getPartitionAssigment());
} else {
$this->consumer->subscribe($this->config->getTopics());
}

$this->committer = $this->committerFactory->make($this->consumer, $this->config);

$this->consumer->subscribe($this->config->getTopics());

$batchConfig = $this->config->getBatchConfig();

if ($batchConfig->isBatchingEnabled()) {
Expand Down

0 comments on commit a35fa40

Please sign in to comment.