diff --git a/.docs/README.md b/.docs/README.md index f947115..51d196b 100644 --- a/.docs/README.md +++ b/.docs/README.md @@ -35,6 +35,7 @@ rabbitmq: host: localhost port: 5672 lazy: false + heartbeatCallback: [@class, function] # Callback that is called every time real heartbeat is send queues: testQueue: diff --git a/README.md b/README.md index 667fc21..48c404a 100644 --- a/README.md +++ b/README.md @@ -31,15 +31,14 @@ For details on how to use this package, check out our [documentation](.docs). | State | Version | Branch | Nette | PHP | |--------|-----------|----------|----------|---------| -| dev | `^10.x.x` | `master` | `3.0+` | `>=8.0` | -| stable | `^10.0.0` | `master` | `3.0+` | `>=8.0` | +| dev | `^10.2.x` | `master` | `3.0+` | `>=8.0` | +| stable | `^10.1.0` | `master` | `3.0+` | `>=8.0` | ## Development This package is currently maintaining by these authors. - ----- diff --git a/src/Connection/Client.php b/src/Connection/Client.php index 94c18f7..d2b6765 100644 --- a/src/Connection/Client.php +++ b/src/Connection/Client.php @@ -8,10 +8,32 @@ use Bunny\ClientStateEnum; use Bunny\Exception\BunnyException; use Bunny\Exception\ClientException; +use Bunny\Protocol\AbstractFrame; use Bunny\Protocol\HeartbeatFrame; +use Nette\Utils\Strings; +/** + * @codeCoverageIgnore + */ class Client extends BunnyClient { + /** + * Constructor. + * + * @param array $options + */ + public function __construct(array $options = []) + { + parent::__construct($options); + + $this->options['cycle_callback'] = is_callable($options['cycle_callback']) + ? $options['cycle_callback'] + : null; + $this->options['heartbeat_callback'] = is_callable($options['heartbeat_callback']) + ? $options['heartbeat_callback'] + : null; + } + /** * @throws BunnyException */ @@ -19,6 +41,8 @@ public function sendHeartbeat(): void { $this->getWriter()->appendFrame(new HeartbeatFrame(), $this->writeBuffer); $this->flushWriteBuffer(); + + $this->options['heartbeat_callback']?->call($this); } public function syncDisconnect(): bool @@ -44,7 +68,102 @@ public function syncDisconnect(): bool } $this->init(); - + return true; } + + protected function write(): void + { + parent::write(); + if (($last = error_get_last()) !== null) { + if (!Strings::match($last['message'], '~fwrite\(\): Send of \d+ bytes failed with errno=\d+ broken pipe~i')) { + return; + } + + error_clear_last(); + throw new ClientException('Broken pipe or closed connection.'); + } + } + + /** + * Runs it's own event loop, processes frames as they arrive. Processes messages for at most $maxSeconds. + * + * @param float $maxSeconds + */ + public function run($maxSeconds = null): void + { + if (!$this->isConnected()) { + throw new ClientException('Client has to be connected.'); + } + + $this->running = true; + $startTime = microtime(true); + $stopTime = null; + if ($maxSeconds !== null) { + $stopTime = $startTime + $maxSeconds; + } + + do { + $this->options['cycle_callback']?->call($this); + if (!empty($this->queue)) { + $frame = array_shift($this->queue); + } else { + /** @var AbstractFrame|null $frame */ + $frame = $this->reader->consumeFrame($this->readBuffer); + if ($frame === null) { + $now = microtime(true); + $nextStreamSelectTimeout = ($this->lastWrite ?: $now) + $this->options["heartbeat"]; + if ($stopTime !== null && $stopTime < $nextStreamSelectTimeout) { + $nextStreamSelectTimeout = $stopTime; + } + $tvSec = max((int)($nextStreamSelectTimeout - $now), 0); + $tvUsec = max((int)(($nextStreamSelectTimeout - $now - $tvSec) * 1000000), 0); + + $r = [$this->getStream()]; + $w = null; + $e = null; + + if (($n = @stream_select($r, $w, $e, $tvSec, $tvUsec)) === false) { + $lastError = error_get_last(); + if ($lastError !== null && + preg_match('/^stream_select\\(\\): unable to select \\[(\\d+)\\]:/', $lastError['message'], $m) && + (int)$m[1] === PCNTL_EINTR + ) { + // got interrupted by signal, dispatch signals & continue + pcntl_signal_dispatch(); + $n = 0; + } else { + throw new ClientException(sprintf( + 'stream_select() failed: %s', + $lastError ? $lastError['message'] : 'Unknown error.' + )); + } + } + $now = microtime(true); + if ($stopTime !== null && $now >= $stopTime) { + break; + } + + if ($n > 0) { + $this->feedReadBuffer(); + } + + continue; + } + } + + /** @var AbstractFrame $frame */ + if ($frame->channel === 0) { + $this->onFrameReceived($frame); + } else { + if (!isset($this->channels[$frame->channel])) { + throw new ClientException( + "Received frame #{$frame->type} on closed channel #{$frame->channel}." + ); + } + + $this->channels[$frame->channel]->onFrameReceived($frame); + } + } while ($this->running); + } } diff --git a/src/Connection/Connection.php b/src/Connection/Connection.php index 376c2a8..f66daff 100644 --- a/src/Connection/Connection.php +++ b/src/Connection/Connection.php @@ -7,8 +7,6 @@ use Bunny\Channel; use Bunny\Exception\ClientException; use Contributte\RabbitMQ\Connection\Exception\ConnectionException; -use Contributte\RabbitMQ\Exchange\IExchange; -use Contributte\RabbitMQ\Queue\IQueue; final class Connection implements IConnection { @@ -16,6 +14,7 @@ final class Connection implements IConnection private const HEARTBEAT_INTERVAL = 1; private Client $bunnyClient; + /** * @var array */ @@ -25,8 +24,8 @@ final class Connection implements IConnection private ?Channel $channel = null; /** - * @throws \Exception * @param array $ssl + * @throws \Exception */ public function __construct( string $host, @@ -40,7 +39,9 @@ public function __construct( string $path, bool $tcpNoDelay, bool $lazy = false, - ?array $ssl = null + ?array $ssl = null, + ?callable $cycleCallback = null, + ?callable $heartbeatCallback = null, ) { $this->connectionParams = [ 'host' => $host, @@ -55,6 +56,8 @@ public function __construct( 'path' => $path, 'tcp_nodelay' => $tcpNoDelay, 'ssl' => $ssl, + 'cycle_callback' => $cycleCallback, + 'heartbeat_callback' => $heartbeatCallback, ]; $this->bunnyClient = $this->createNewConnection(); @@ -117,7 +120,6 @@ public function getChannel(): Channel return $this->channel; } - /** * @throws \Exception */ @@ -130,7 +132,6 @@ public function connectIfNeeded(): void $this->bunnyClient->connect(); } - public function sendHeartbeat(): void { $now = time(); @@ -140,7 +141,6 @@ public function sendHeartbeat(): void } } - public function getVhost(): string { return $this->connectionParams['vhost']; diff --git a/src/Connection/ConnectionFactory.php b/src/Connection/ConnectionFactory.php index 41a3631..b38e451 100644 --- a/src/Connection/ConnectionFactory.php +++ b/src/Connection/ConnectionFactory.php @@ -56,6 +56,14 @@ public function getApi(string $name): IApi return $this->requests[$name]; } + public function sendHeartbeat(): bool + { + foreach ($this->connections as $connection) { + $connection->sendHeartbeat(); + } + return true; + } + /** * @throws ConnectionFactoryException */ @@ -108,7 +116,9 @@ private function create(string $name): IConnection $connectionData['path'], $connectionData['tcpNoDelay'], $connectionData['lazy'], - $connectionData['ssl'] + $connectionData['ssl'], + fn () => $this->sendHeartbeat(), + $connectionData['heartbeatCallback'] ?? null, ); } } diff --git a/src/DI/Helpers/ConnectionsHelper.php b/src/DI/Helpers/ConnectionsHelper.php index 272df5f..63e6e70 100644 --- a/src/DI/Helpers/ConnectionsHelper.php +++ b/src/DI/Helpers/ConnectionsHelper.php @@ -29,6 +29,7 @@ public function getConfigSchema(): Schema 'tcpNoDelay' => Expect::bool(false), 'lazy' => Expect::bool(true), 'ssl' => Expect::array(null)->required(false), + 'heartbeatCallback' => Expect::array(null)->required(false)->assert('is_callable'), 'admin' => Expect::structure([ 'port' => Expect::int(15672), 'secure' => Expect::bool(false), diff --git a/src/DI/Helpers/ConsumersHelper.php b/src/DI/Helpers/ConsumersHelper.php index f3bbb3e..eb9a2d9 100644 --- a/src/DI/Helpers/ConsumersHelper.php +++ b/src/DI/Helpers/ConsumersHelper.php @@ -18,7 +18,7 @@ public function getConfigSchema(): Schema return Expect::arrayOf( Expect::structure([ 'queue' => Expect::string()->required(true), - 'callback' => Expect::array()->required(true), + 'callback' => Expect::array()->required(true)->assert('is_callable'), 'idleTimeout' => Expect::int(30), 'bulk' => Expect::structure([ 'size' => Expect::int()->min(1), diff --git a/src/Producer/Producer.php b/src/Producer/Producer.php index c2a8c5e..e4341b9 100644 --- a/src/Producer/Producer.php +++ b/src/Producer/Producer.php @@ -8,8 +8,6 @@ use Contributte\RabbitMQ\Exchange\IExchange; use Contributte\RabbitMQ\LazyDeclarator; use Contributte\RabbitMQ\Queue\IQueue; -use Nette\Utils\Callback; -use Tracy\Dumper; final class Producer { @@ -39,11 +37,11 @@ public function publish(string $message, array $headers = [], ?string $routingKe $headers = array_merge($this->getBasicHeaders(), $headers); if ($this->queue !== null) { - $this->publishToQueue($message, $headers); + $this->tryPublish($this->queue, $message, $headers, '', $this->queue->getName()); } if ($this->exchange !== null) { - $this->publishToExchange($message, $headers, $routingKey ?? ''); + $this->tryPublish($this->exchange, $message, $headers, $this->exchange->getName(), $routingKey ?? ''); } foreach ($this->publishCallbacks as $callback) { @@ -58,18 +56,6 @@ public function addOnPublishCallback(callable $callback): void } - public function sendHeartbeat(): void - { - trigger_error(__METHOD__ . '() is deprecated, use dependency ConnectionFactory::sendHeartbeat().', E_USER_DEPRECATED); - if ($this->queue !== null) { - $this->queue->getConnection()->sendHeartbeat(); - } - if ($this->exchange !== null) { - $this->exchange->getConnection()->sendHeartbeat(); - } - } - - /** * @return array */ @@ -81,30 +67,6 @@ private function getBasicHeaders(): array ]; } - /** - * @param array $headers - */ - private function publishToQueue(string $message, array $headers = []): void - { - if (null === $queue = $this->queue) { - throw new \UnexpectedValueException('Queue is not defined'); - } - - $this->tryPublish($queue, $message, $headers, '', $queue->getName()); - } - - - /** - * @param array $headers - */ - private function publishToExchange(string $message, array $headers, string $routingKey): void - { - if (null === $exchange = $this->exchange) { - throw new \UnexpectedValueException('Exchange is not defined'); - } - - $this->tryPublish($exchange, $message, $headers, $exchange->getName(), $routingKey); - } /** * @param array $headers