Skip to content

Commit

Permalink
Add "no_ack" consumer option (issue-717) (#718)
Browse files Browse the repository at this point in the history
* Add no_ack consumer option

* Replace consumer_options to options

* Add options for anon consumer

* Add Unit tests for DI

* Update README.md

* Update CHANGELOG
  • Loading branch information
andrey-tech authored Nov 7, 2023
1 parent e105112 commit d2a17ab
Show file tree
Hide file tree
Showing 10 changed files with 127 additions and 3 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
- 2023-11-07
* Add consumer option `no_ack`

- 2021-05-15
* Add possibility to use multiple RabbitMQ hosts

Expand Down
30 changes: 30 additions & 0 deletions DependencyInjection/Configuration.php
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,12 @@ protected function addConsumers(ArrayNodeDefinition $node)
->end()
->end()
->scalarNode('auto_setup_fabric')->defaultTrue()->end()
->arrayNode('options')
->canBeUnset()
->children()
->booleanNode('no_ack')->defaultFalse()->end()
->end()
->end()
->arrayNode('qos_options')
->canBeUnset()
->children()
Expand Down Expand Up @@ -217,6 +223,12 @@ protected function addMultipleConsumers(ArrayNodeDefinition $node)
->scalarNode('idle_timeout_exit_code')->end()
->scalarNode('timeout_wait')->end()
->scalarNode('auto_setup_fabric')->defaultTrue()->end()
->arrayNode('options')
->canBeUnset()
->children()
->booleanNode('no_ack')->defaultFalse()->end()
->end()
->end()
->arrayNode('graceful_max_execution')
->canBeUnset()
->children()
Expand Down Expand Up @@ -265,6 +277,12 @@ protected function addDynamicConsumers(ArrayNodeDefinition $node)
->end()
->end()
->scalarNode('auto_setup_fabric')->defaultTrue()->end()
->arrayNode('options')
->canBeUnset()
->children()
->booleanNode('no_ack')->defaultFalse()->end()
->end()
->end()
->arrayNode('qos_options')
->canBeUnset()
->children()
Expand Down Expand Up @@ -311,6 +329,12 @@ protected function addBatchConsumers(ArrayNodeDefinition $node)
->end()
->end()
->scalarNode('auto_setup_fabric')->defaultTrue()->end()
->arrayNode('options')
->canBeUnset()
->children()
->booleanNode('no_ack')->defaultFalse()->end()
->end()
->end()
->arrayNode('qos_options')
->children()
->scalarNode('prefetch_size')->defaultValue(0)->end()
Expand Down Expand Up @@ -339,6 +363,12 @@ protected function addAnonConsumers(ArrayNodeDefinition $node)
->children()
->scalarNode('connection')->defaultValue('default')->end()
->scalarNode('callback')->isRequired()->end()
->arrayNode('options')
->canBeUnset()
->children()
->booleanNode('no_ack')->defaultFalse()->end()
->end()
->end()
->end()
->end()
->end()
Expand Down
33 changes: 33 additions & 0 deletions DependencyInjection/OldSoundRabbitMqExtension.php
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,12 @@ protected function loadConsumers()
if (!$consumer['auto_setup_fabric']) {
$definition->addMethodCall('disableAutoSetupFabric');
}
if (isset($consumer['options'])) {
$definition->addMethodCall(
'setConsumerOptions',
[$this->normalizeArgumentKeys($consumer['options'])]
);
}

$this->injectConnection($definition, $consumer['connection']);
if ($this->collectorEnabled) {
Expand Down Expand Up @@ -349,6 +355,12 @@ protected function loadMultipleConsumers()
if (!$consumer['auto_setup_fabric']) {
$definition->addMethodCall('disableAutoSetupFabric');
}
if (isset($consumer['options'])) {
$definition->addMethodCall(
'setConsumerOptions',
[$this->normalizeArgumentKeys($consumer['options'])]
);
}

$this->injectConnection($definition, $consumer['connection']);
if ($this->collectorEnabled) {
Expand Down Expand Up @@ -424,6 +436,12 @@ protected function loadDynamicConsumers()
if (!$consumer['auto_setup_fabric']) {
$definition->addMethodCall('disableAutoSetupFabric');
}
if (isset($consumer['options'])) {
$definition->addMethodCall(
'setConsumerOptions',
[$this->normalizeArgumentKeys($consumer['options'])]
);
}

$this->injectConnection($definition, $consumer['connection']);
if ($this->collectorEnabled) {
Expand Down Expand Up @@ -485,6 +503,13 @@ protected function loadBatchConsumers()
$definition->addMethodCall('disableAutoSetupFabric');
}

if (isset($consumer['options'])) {
$definition->addMethodCall(
'setConsumerOptions',
[$this->normalizeArgumentKeys($consumer['options'])]
);
}

if ($consumer['keep_alive']) {
$definition->addMethodCall('keepAlive');
}
Expand Down Expand Up @@ -512,6 +537,14 @@ protected function loadAnonConsumers()
->addTag('old_sound_rabbit_mq.anon_consumer')
->addMethodCall('setExchangeOptions', [$this->normalizeArgumentKeys($anon['exchange_options'])])
->addMethodCall('setCallback', [[new Reference($anon['callback']), 'execute']]);

if (isset($anon['options'])) {
$definition->addMethodCall(
'setConsumerOptions',
[$this->normalizeArgumentKeys($anon['options'])]
);
}

$this->injectConnection($definition, $anon['connection']);
if ($this->collectorEnabled) {
$this->injectLoggedChannel($definition, $key, $anon['connection']);
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ old_sound_rabbit_mq:
exchange_options: {name: 'upload-picture', type: direct}
queue_options: {name: 'upload-picture'}
callback: upload_picture_service
options:
no_ack: false # optional. If set to "true", automatic acknowledgement mode will be used by this consumer. Default "false". See https://www.rabbitmq.com/confirms.html for details.
```
Here we configure the connection service and the message endpoints that our application will have. In this example your service container will contain the service `old_sound_rabbit_mq.upload_picture_producer` and `old_sound_rabbit_mq.upload_picture_consumer`. The later expects that there's a service called `upload_picture_service`.
Expand Down
13 changes: 13 additions & 0 deletions RabbitMq/BaseAmqp.php
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ abstract class BaseAmqp
'declare' => true,
];

protected $consumerOptions = [
'no_ack' => false,
];

/**
* @var EventDispatcherInterface|null
*/
Expand Down Expand Up @@ -155,6 +159,15 @@ public function setQueueOptions(array $options = [])
$this->queueOptions = array_merge($this->queueOptions, $options);
}

/**
* @param array $options
* @return void
*/
public function setConsumerOptions(array $options = [])
{
$this->consumerOptions = array_merge($this->consumerOptions, $options);
}

/**
* @param string $routingKey
* @return void
Expand Down
2 changes: 1 addition & 1 deletion RabbitMq/BaseConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ protected function setupConsumer()
if ($this->autoSetupFabric) {
$this->setupFabric();
}
$this->getChannel()->basic_consume($this->queueOptions['name'], $this->getConsumerTag(), false, false, false, false, [$this, 'processMessage']);
$this->getChannel()->basic_consume($this->queueOptions['name'], $this->getConsumerTag(), false, $this->consumerOptions['no_ack'], false, false, [$this, 'processMessage']);
}

public function processMessage(AMQPMessage $msg)
Expand Down
2 changes: 1 addition & 1 deletion RabbitMq/BatchConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ protected function setupConsumer()
$this->setupFabric();
}

$this->getChannel()->basic_consume($this->queueOptions['name'], $this->getConsumerTag(), false, false, false, false, [$this, 'processMessage']);
$this->getChannel()->basic_consume($this->queueOptions['name'], $this->getConsumerTag(), false, $this->consumerOptions['no_ack'], false, false, [$this, 'processMessage']);
}

/**
Expand Down
2 changes: 1 addition & 1 deletion RabbitMq/MultipleConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ protected function setupConsumer()
//PHP 5.3 Compliant
$currentObject = $this;

$this->getChannel()->basic_consume($name, $this->getQueueConsumerTag($name), false, false, false, false, function (AMQPMessage $msg) use ($currentObject, $name) {
$this->getChannel()->basic_consume($name, $this->getQueueConsumerTag($name), false, $this->consumerOptions['no_ack'], false, false, function (AMQPMessage $msg) use ($currentObject, $name) {
$currentObject->processQueueMessage($name, $msg);
});
}
Expand Down
11 changes: 11 additions & 0 deletions Tests/DependencyInjection/Fixtures/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ old_sound_rabbit_mq:
- 'android.#.upload'
- 'iphone.upload'
callback: foo.callback
options:
no_ack: true

default_consumer:
exchange_options:
Expand Down Expand Up @@ -169,6 +171,8 @@ old_sound_rabbit_mq:
- 'iphone.upload'
callback: foo.multiple_test2.callback
queues_provider: foo.queues_provider
options:
no_ack: true

dynamic_consumers:
foo_dyn_consumer:
Expand All @@ -178,16 +182,21 @@ old_sound_rabbit_mq:
type: direct
callback: foo.dynamic.callback
queue_options_provider: foo.dynamic.provider
options:
no_ack: true

bar_dyn_consumer:
connection: bar_default
exchange_options:
name: bar_dynamic_exchange
type: direct
callback: bar.dynamic.callback
queue_options_provider: bar.dynamic.provider

bindings:
- {exchange: foo, destination: bar, routing_key: baz}
- {exchange: moo, connection: default2, destination: cow, nowait: true, destination_is_exchange: true, arguments: {moo: cow}}

anon_consumers:
foo_anon_consumer:
connection: foo_connection
Expand All @@ -202,6 +211,8 @@ old_sound_rabbit_mq:
arguments: null
ticket: null
callback: foo_anon.callback
options:
no_ack: true

default_anon_consumer:
exchange_options:
Expand Down
32 changes: 32 additions & 0 deletions Tests/DependencyInjection/OldSoundRabbitMqExtensionTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,14 @@ public function testFooConsumerDefinition()
'setTimeoutWait',
[3],
],
[
'setConsumerOptions',
[
[
'no_ack' => true,
],
],
],
],
$definition->getMethodCalls()
);
Expand Down Expand Up @@ -670,6 +678,14 @@ public function testMultipleConsumerDefinition()
'setTimeoutWait',
[3],
],
[
'setConsumerOptions',
[
[
'no_ack' => true,
],
],
],
],
$definition->getMethodCalls()
);
Expand Down Expand Up @@ -714,6 +730,14 @@ public function testDynamicConsumerDefinition()
new Reference('foo.dynamic.provider'),
],
],
[
'setConsumerOptions',
[
[
'no_ack' => true,
],
],
],
],
$definition->getMethodCalls()
);
Expand Down Expand Up @@ -750,6 +774,14 @@ public function testFooAnonConsumerDefinition()
'setCallback',
[[new Reference('foo_anon.callback'), 'execute']],
],
[
'setConsumerOptions',
[
[
'no_ack' => true,
],
],
],
],
$definition->getMethodCalls()
);
Expand Down

0 comments on commit d2a17ab

Please sign in to comment.