diff --git a/.docs/README.md b/.docs/README.md
index f947115..51d196b 100644
--- a/.docs/README.md
+++ b/.docs/README.md
@@ -35,6 +35,7 @@ rabbitmq:
host: localhost
port: 5672
lazy: false
+ heartbeatCallback: [@class, function] # Callback that is called every time real heartbeat is send
queues:
testQueue:
diff --git a/README.md b/README.md
index 667fc21..48c404a 100644
--- a/README.md
+++ b/README.md
@@ -31,15 +31,14 @@ For details on how to use this package, check out our [documentation](.docs).
| State | Version | Branch | Nette | PHP |
|--------|-----------|----------|----------|---------|
-| dev | `^10.x.x` | `master` | `3.0+` | `>=8.0` |
-| stable | `^10.0.0` | `master` | `3.0+` | `>=8.0` |
+| dev | `^10.2.x` | `master` | `3.0+` | `>=8.0` |
+| stable | `^10.1.0` | `master` | `3.0+` | `>=8.0` |
## Development
This package is currently maintaining by these authors.
-
-----
diff --git a/src/Connection/Client.php b/src/Connection/Client.php
index 5bf17ae..d2b6765 100644
--- a/src/Connection/Client.php
+++ b/src/Connection/Client.php
@@ -24,9 +24,14 @@ class Client extends BunnyClient
*/
public function __construct(array $options = [])
{
- // Construct parent
parent::__construct($options);
- $this->options['cycle_callback'] = $options['cycle_callback'];
+
+ $this->options['cycle_callback'] = is_callable($options['cycle_callback'])
+ ? $options['cycle_callback']
+ : null;
+ $this->options['heartbeat_callback'] = is_callable($options['heartbeat_callback'])
+ ? $options['heartbeat_callback']
+ : null;
}
/**
@@ -37,9 +42,7 @@ public function sendHeartbeat(): void
$this->getWriter()->appendFrame(new HeartbeatFrame(), $this->writeBuffer);
$this->flushWriteBuffer();
- if (is_callable($this->options['heartbeat_callback'] ?? null)) {
- $this->options['heartbeat_callback']->call($this);
- }
+ $this->options['heartbeat_callback']?->call($this);
}
public function syncDisconnect(): bool
@@ -73,7 +76,7 @@ protected function write(): void
{
parent::write();
if (($last = error_get_last()) !== null) {
- if (!Strings::match($last['message'], '~fwrite(): Send of \d+ bytes failed with errno=\d+ broken pipe~i')) {
+ if (!Strings::match($last['message'], '~fwrite\(\): Send of \d+ bytes failed with errno=\d+ broken pipe~i')) {
return;
}
@@ -101,10 +104,7 @@ public function run($maxSeconds = null): void
}
do {
- if (is_callable($this->options['cycle_callback'] ?? null)) {
- $this->options['cycle_callback']->call($this);
- }
-
+ $this->options['cycle_callback']?->call($this);
if (!empty($this->queue)) {
$frame = array_shift($this->queue);
} else {
diff --git a/src/Connection/Connection.php b/src/Connection/Connection.php
index fafbc0b..f66daff 100644
--- a/src/Connection/Connection.php
+++ b/src/Connection/Connection.php
@@ -40,7 +40,8 @@ public function __construct(
bool $tcpNoDelay,
bool $lazy = false,
?array $ssl = null,
- ?callable $cycleCallback = null
+ ?callable $cycleCallback = null,
+ ?callable $heartbeatCallback = null,
) {
$this->connectionParams = [
'host' => $host,
@@ -56,6 +57,7 @@ public function __construct(
'tcp_nodelay' => $tcpNoDelay,
'ssl' => $ssl,
'cycle_callback' => $cycleCallback,
+ 'heartbeat_callback' => $heartbeatCallback,
];
$this->bunnyClient = $this->createNewConnection();
diff --git a/src/Connection/ConnectionFactory.php b/src/Connection/ConnectionFactory.php
index 4a5af40..b38e451 100644
--- a/src/Connection/ConnectionFactory.php
+++ b/src/Connection/ConnectionFactory.php
@@ -117,7 +117,8 @@ private function create(string $name): IConnection
$connectionData['tcpNoDelay'],
$connectionData['lazy'],
$connectionData['ssl'],
- fn () => $this->sendHeartbeat()
+ fn () => $this->sendHeartbeat(),
+ $connectionData['heartbeatCallback'] ?? null,
);
}
}
diff --git a/src/DI/Helpers/ConnectionsHelper.php b/src/DI/Helpers/ConnectionsHelper.php
index 272df5f..63e6e70 100644
--- a/src/DI/Helpers/ConnectionsHelper.php
+++ b/src/DI/Helpers/ConnectionsHelper.php
@@ -29,6 +29,7 @@ public function getConfigSchema(): Schema
'tcpNoDelay' => Expect::bool(false),
'lazy' => Expect::bool(true),
'ssl' => Expect::array(null)->required(false),
+ 'heartbeatCallback' => Expect::array(null)->required(false)->assert('is_callable'),
'admin' => Expect::structure([
'port' => Expect::int(15672),
'secure' => Expect::bool(false),
diff --git a/src/DI/Helpers/ConsumersHelper.php b/src/DI/Helpers/ConsumersHelper.php
index f3bbb3e..eb9a2d9 100644
--- a/src/DI/Helpers/ConsumersHelper.php
+++ b/src/DI/Helpers/ConsumersHelper.php
@@ -18,7 +18,7 @@ public function getConfigSchema(): Schema
return Expect::arrayOf(
Expect::structure([
'queue' => Expect::string()->required(true),
- 'callback' => Expect::array()->required(true),
+ 'callback' => Expect::array()->required(true)->assert('is_callable'),
'idleTimeout' => Expect::int(30),
'bulk' => Expect::structure([
'size' => Expect::int()->min(1),
diff --git a/src/Producer/Producer.php b/src/Producer/Producer.php
index c2a8c5e..e4341b9 100644
--- a/src/Producer/Producer.php
+++ b/src/Producer/Producer.php
@@ -8,8 +8,6 @@
use Contributte\RabbitMQ\Exchange\IExchange;
use Contributte\RabbitMQ\LazyDeclarator;
use Contributte\RabbitMQ\Queue\IQueue;
-use Nette\Utils\Callback;
-use Tracy\Dumper;
final class Producer
{
@@ -39,11 +37,11 @@ public function publish(string $message, array $headers = [], ?string $routingKe
$headers = array_merge($this->getBasicHeaders(), $headers);
if ($this->queue !== null) {
- $this->publishToQueue($message, $headers);
+ $this->tryPublish($this->queue, $message, $headers, '', $this->queue->getName());
}
if ($this->exchange !== null) {
- $this->publishToExchange($message, $headers, $routingKey ?? '');
+ $this->tryPublish($this->exchange, $message, $headers, $this->exchange->getName(), $routingKey ?? '');
}
foreach ($this->publishCallbacks as $callback) {
@@ -58,18 +56,6 @@ public function addOnPublishCallback(callable $callback): void
}
- public function sendHeartbeat(): void
- {
- trigger_error(__METHOD__ . '() is deprecated, use dependency ConnectionFactory::sendHeartbeat().', E_USER_DEPRECATED);
- if ($this->queue !== null) {
- $this->queue->getConnection()->sendHeartbeat();
- }
- if ($this->exchange !== null) {
- $this->exchange->getConnection()->sendHeartbeat();
- }
- }
-
-
/**
* @return array
*/
@@ -81,30 +67,6 @@ private function getBasicHeaders(): array
];
}
- /**
- * @param array $headers
- */
- private function publishToQueue(string $message, array $headers = []): void
- {
- if (null === $queue = $this->queue) {
- throw new \UnexpectedValueException('Queue is not defined');
- }
-
- $this->tryPublish($queue, $message, $headers, '', $queue->getName());
- }
-
-
- /**
- * @param array $headers
- */
- private function publishToExchange(string $message, array $headers, string $routingKey): void
- {
- if (null === $exchange = $this->exchange) {
- throw new \UnexpectedValueException('Exchange is not defined');
- }
-
- $this->tryPublish($exchange, $message, $headers, $exchange->getName(), $routingKey);
- }
/**
* @param array $headers