diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 514e9b5..a29dda2 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -32,7 +32,7 @@ jobs: - name: Install Dependencies uses: ramsey/composer-install@v2 - name: Run PHPStan - run: ./vendor/bin/phpstan analyze src --level 0 + run: ./vendor/bin/phpstan analyze src --level 5 test: name: "Run Tests on PHP ${{ matrix.php }} against RabbitMQ ${{ matrix.rabbitmq }} (Composer: ${{ matrix.composer }}; TLS: ${{ matrix.ssl_test }})" runs-on: ubuntu-latest diff --git a/spec/generate.php b/spec/generate.php index 4a6b309..50474eb 100644 --- a/spec/generate.php +++ b/spec/generate.php @@ -1,14 +1,7 @@ connectionClose(\$code, 0, 0, \$reason);\n"; $connectionContent .= " \$this->connection->close();\n"; @@ -353,7 +346,7 @@ function amqpTypeToLength($type, $e) $connectionContent .= " *\n"; $connectionContent .= " * @param AbstractFrame \$frame\n"; $connectionContent .= " */\n"; -$connectionContent .= " private function onFrameReceived(AbstractFrame \$frame)\n"; +$connectionContent .= " private function onFrameReceived(AbstractFrame \$frame): void\n"; $connectionContent .= " {\n"; $connectionContent .= " if (\$frame instanceof MethodConnectionCloseFrame) {\n"; $connectionContent .= " \$this->disconnect(Constants::STATUS_CONNECTION_FORCED, \"Connection closed by server: ({\$frame->replyCode}) \" . \$frame->replyText);\n"; @@ -735,8 +728,9 @@ function amqpTypeToLength($type, $e) $connectionContent .= " \$buffer = \$this->writeBuffer;\n"; if ($class->id === 60 && $method->id === 40) { $connectionContent .= " \$ck = serialize([\$channel, \$headers, \$exchange, \$routingKey, \$mandatory, \$immediate]);\n"; - $connectionContent .= " \$c = isset(\$this->cache[\$ck]) ? \$this->cache[\$ck] : null;\n"; - $connectionContent .= " \$flags = 0; \$off0 = 0; \$len0 = 0; \$off1 = 0; \$len1 = 0; \$contentTypeLength = null; \$contentType = null; \$contentEncodingLength = null; \$contentEncoding = null; \$headersBuffer = null; \$deliveryMode = null; \$priority = null; \$correlationIdLength = null; \$correlationId = null; \$replyToLength = null; \$replyTo = null; \$expirationLength = null; \$expiration = null; \$messageIdLength = null; \$messageId = null; \$timestamp = null; \$typeLength = null; \$type = null; \$userIdLength = null; \$userId = null; \$appIdLength = null; \$appId = null; \$clusterIdLength = null; \$clusterId = null;\n"; + $connectionContent .= " \$c = \$this->cache[\$ck] ?? null;\n"; + $connectionContent .= " \$flags = \$off0 = \$len0 = \$off1 = \$len1 = 0;\n"; + $connectionContent .= " \$contentTypeLength = \$contentType = \$contentEncodingLength = \$contentEncoding = \$headersBuffer = \$deliveryMode = \$priority = \$correlationIdLength = \$correlationId = \$replyToLength = \$replyTo = \$expirationLength = \$expiration = \$messageIdLength = \$messageId = \$timestamp = \$typeLength = \$type = \$userIdLength = \$userId = \$appIdLength = \$appId = \$clusterIdLength = \$clusterId = null;\n"; $connectionContent .= " if (\$c) { \$buffer->append(\$c[0]); }\n"; $connectionContent .= " else {\n"; $connectionContent .= " \$off0 = \$buffer->getLength();\n"; @@ -769,6 +763,7 @@ function amqpTypeToLength($type, $e) // FIXME: respect max body size agreed upon connection.tune $connectionContent .= " \$s = 14;\n"; + $connectionContent .= "\n"; foreach ([ @@ -788,9 +783,8 @@ function amqpTypeToLength($type, $e) ] as $flag => $property ) { list($propertyName, $staticSize, $dynamicSize) = $property; - $connectionContent .= " if (isset(\$headers['{$propertyName}'])) {\n"; + $connectionContent .= " if (\$" . lcfirst(dashedToCamel($propertyName)) . " = \$headers['{$propertyName}'] ?? null) {\n"; $connectionContent .= " \$flags |= {$flag};\n"; - $connectionContent .= " \$" . lcfirst(dashedToCamel($propertyName)) . " = \$headers['{$propertyName}'];\n"; if ($staticSize) { $connectionContent .= " \$s += {$staticSize};\n"; } @@ -799,6 +793,7 @@ function amqpTypeToLength($type, $e) } $connectionContent .= " unset(\$headers['{$propertyName}']);\n"; $connectionContent .= " }\n"; + $connectionContent .= "\n"; } $connectionContent .= " if (!empty(\$headers)) {\n"; @@ -892,10 +887,13 @@ function amqpTypeToLength($type, $e) $connectionContent .= " \$deferred = new Deferred();\n"; $connectionContent .= " \$this->awaitList[] = [\n"; $connectionContent .= " 'filter' => function (Protocol\\AbstractFrame \$frame)" . ($class->id !== 10 ? " use (\$channel)" : "") . ": bool {\n"; - $connectionContent .= " if (\$frame instanceof Protocol\\{$className}" . ($class->id !== 10 ? " && \$frame->channel === \$channel" : "") . ") {\n"; - $connectionContent .= " return true;\n"; - $connectionContent .= " }\n"; - $connectionContent .= "\n"; + + if ($class->id !== 10 || $method->id !== 50) { + $connectionContent .= " if (\$frame instanceof Protocol\\{$className}" . ($class->id !== 10 ? " && \$frame->channel === \$channel" : "") . ") {\n"; + $connectionContent .= " return true;\n"; + $connectionContent .= " }\n"; + $connectionContent .= "\n"; + } if ($class->id === 60 && $method->id === 71) { $connectionContent .= " if (\$frame instanceof Protocol\\" . str_replace("GetOk", "GetEmpty", $className) . ($class->id !== 10 ? " && \$frame->channel === \$channel" : "") . ") {\n"; @@ -968,7 +966,7 @@ function amqpTypeToLength($type, $e) $protocolWriterContent .= "}\n"; file_put_contents(__DIR__ . "/../src/Protocol/ProtocolWriterGenerated.php", $protocolWriterContent); -$connectionContent .= " public function startHeathbeatTimer(): void\n"; +$connectionContent .= " public function startHeartbeatTimer(): void\n"; $connectionContent .= " {\n"; $connectionContent .= " \$this->heartbeatTimer = Loop::addTimer(\$this->options['heartbeat'], [\$this, 'onHeartbeat']);\n"; $connectionContent .= " \$this->connection->on('drain', [\$this, 'onHeartbeat']);\n"; @@ -977,7 +975,7 @@ function amqpTypeToLength($type, $e) $connectionContent .= " /**\n"; $connectionContent .= " * Callback when heartbeat timer timed out.\n"; $connectionContent .= " */\n"; -$connectionContent .= " public function onHeartbeat()\n"; +$connectionContent .= " public function onHeartbeat(): void\n"; $connectionContent .= " {\n"; $connectionContent .= " \$now = microtime(true);\n"; $connectionContent .= " \$nextHeartbeat = (\$this->lastWrite ?: \$now) + \$this->options['heartbeat'];\n"; diff --git a/src/Client.php b/src/Client.php index a74e770..fb7ec19 100644 --- a/src/Client.php +++ b/src/Client.php @@ -211,7 +211,7 @@ public function connect(): self } $this->connection->connectionTuneOk($tune->channelMax, $tune->frameMax, (int)$this->options['heartbeat']); $this->connection->connectionOpen($this->options['vhost']); - $this->connection->startHeathbeatTimer(); + $this->connection->startHeartbeatTimer(); $this->state = ClientStateEnum::CONNECTED; } catch (\Throwable $thrown) { diff --git a/src/Connection.php b/src/Connection.php index ed5a9c7..1d7108d 100644 --- a/src/Connection.php +++ b/src/Connection.php @@ -83,7 +83,7 @@ public function __construct( }); } - public function disconnect(int $code, string $reason) + public function disconnect(int $code, string $reason): void { $this->connectionClose($code, 0, 0, $reason); $this->connection->close(); @@ -100,7 +100,7 @@ public function disconnect(int $code, string $reason) * * @param AbstractFrame $frame */ - private function onFrameReceived(AbstractFrame $frame) + private function onFrameReceived(AbstractFrame $frame): void { if ($frame instanceof MethodConnectionCloseFrame) { $this->disconnect(Constants::STATUS_CONNECTION_FORCED, "Connection closed by server: ({$frame->replyCode}) " . $frame->replyText); @@ -363,10 +363,6 @@ public function awaitConnectionClose(): Protocol\MethodConnectionCloseFrame $deferred = new Deferred(); $this->awaitList[] = [ 'filter' => function (Protocol\AbstractFrame $frame): bool { - if ($frame instanceof Protocol\MethodConnectionCloseFrame) { - return true; - } - if ($frame instanceof Protocol\MethodConnectionCloseFrame) { $this->connectionCloseOk(); throw new ClientException($frame->replyText, $frame->replyCode); @@ -1258,8 +1254,9 @@ public function publish(int $channel, string $body, array $headers = [], string { $buffer = $this->writeBuffer; $ck = serialize([$channel, $headers, $exchange, $routingKey, $mandatory, $immediate]); - $c = isset($this->cache[$ck]) ? $this->cache[$ck] : null; - $flags = 0; $off0 = 0; $len0 = 0; $off1 = 0; $len1 = 0; $contentTypeLength = null; $contentType = null; $contentEncodingLength = null; $contentEncoding = null; $headersBuffer = null; $deliveryMode = null; $priority = null; $correlationIdLength = null; $correlationId = null; $replyToLength = null; $replyTo = null; $expirationLength = null; $expiration = null; $messageIdLength = null; $messageId = null; $timestamp = null; $typeLength = null; $type = null; $userIdLength = null; $userId = null; $appIdLength = null; $appId = null; $clusterIdLength = null; $clusterId = null; + $c = $this->cache[$ck] ?? null; + $flags = $off0 = $len0 = $off1 = $len1 = 0; + $contentTypeLength = $contentType = $contentEncodingLength = $contentEncoding = $headersBuffer = $deliveryMode = $priority = $correlationIdLength = $correlationId = $replyToLength = $replyTo = $expirationLength = $expiration = $messageIdLength = $messageId = $timestamp = $typeLength = $type = $userIdLength = $userId = $appIdLength = $appId = $clusterIdLength = $clusterId = null; if ($c) { $buffer->append($c[0]); } else { $off0 = $buffer->getLength(); @@ -1274,94 +1271,95 @@ public function publish(int $channel, string $body, array $headers = [], string $this->writer->appendBits([$mandatory, $immediate], $buffer); $buffer->appendUint8(206); $s = 14; - if (isset($headers['content-type'])) { + + if ($contentType = $headers['content-type'] ?? null) { $flags |= 32768; - $contentType = $headers['content-type']; $s += 1; $s += $contentTypeLength = strlen($contentType); unset($headers['content-type']); } - if (isset($headers['content-encoding'])) { + + if ($contentEncoding = $headers['content-encoding'] ?? null) { $flags |= 16384; - $contentEncoding = $headers['content-encoding']; $s += 1; $s += $contentEncodingLength = strlen($contentEncoding); unset($headers['content-encoding']); } - if (isset($headers['delivery-mode'])) { + + if ($deliveryMode = $headers['delivery-mode'] ?? null) { $flags |= 4096; - $deliveryMode = $headers['delivery-mode']; $s += 1; unset($headers['delivery-mode']); } - if (isset($headers['priority'])) { + + if ($priority = $headers['priority'] ?? null) { $flags |= 2048; - $priority = $headers['priority']; $s += 1; unset($headers['priority']); } - if (isset($headers['correlation-id'])) { + + if ($correlationId = $headers['correlation-id'] ?? null) { $flags |= 1024; - $correlationId = $headers['correlation-id']; $s += 1; $s += $correlationIdLength = strlen($correlationId); unset($headers['correlation-id']); } - if (isset($headers['reply-to'])) { + + if ($replyTo = $headers['reply-to'] ?? null) { $flags |= 512; - $replyTo = $headers['reply-to']; $s += 1; $s += $replyToLength = strlen($replyTo); unset($headers['reply-to']); } - if (isset($headers['expiration'])) { + + if ($expiration = $headers['expiration'] ?? null) { $flags |= 256; - $expiration = $headers['expiration']; $s += 1; $s += $expirationLength = strlen($expiration); unset($headers['expiration']); } - if (isset($headers['message-id'])) { + + if ($messageId = $headers['message-id'] ?? null) { $flags |= 128; - $messageId = $headers['message-id']; $s += 1; $s += $messageIdLength = strlen($messageId); unset($headers['message-id']); } - if (isset($headers['timestamp'])) { + + if ($timestamp = $headers['timestamp'] ?? null) { $flags |= 64; - $timestamp = $headers['timestamp']; $s += 8; unset($headers['timestamp']); } - if (isset($headers['type'])) { + + if ($type = $headers['type'] ?? null) { $flags |= 32; - $type = $headers['type']; $s += 1; $s += $typeLength = strlen($type); unset($headers['type']); } - if (isset($headers['user-id'])) { + + if ($userId = $headers['user-id'] ?? null) { $flags |= 16; - $userId = $headers['user-id']; $s += 1; $s += $userIdLength = strlen($userId); unset($headers['user-id']); } - if (isset($headers['app-id'])) { + + if ($appId = $headers['app-id'] ?? null) { $flags |= 8; - $appId = $headers['app-id']; $s += 1; $s += $appIdLength = strlen($appId); unset($headers['app-id']); } - if (isset($headers['cluster-id'])) { + + if ($clusterId = $headers['cluster-id'] ?? null) { $flags |= 4; - $clusterId = $headers['cluster-id']; $s += 1; $s += $clusterIdLength = strlen($clusterId); unset($headers['cluster-id']); } + if (!empty($headers)) { $flags |= 8192; $this->writer->appendTable($headers, $headersBuffer = new Buffer()); @@ -1849,7 +1847,7 @@ public function awaitConfirmSelectOk(int $channel): Protocol\MethodConfirmSelect return await($deferred->promise()); } - public function startHeathbeatTimer(): void + public function startHeartbeatTimer(): void { $this->heartbeatTimer = Loop::addTimer($this->options['heartbeat'], [$this, 'onHeartbeat']); $this->connection->on('drain', [$this, 'onHeartbeat']); @@ -1858,7 +1856,7 @@ public function startHeathbeatTimer(): void /** * Callback when heartbeat timer timed out. */ - public function onHeartbeat() + public function onHeartbeat(): void { $now = microtime(true); $nextHeartbeat = ($this->lastWrite ?: $now) + $this->options['heartbeat'];