Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generate better connections #152

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ jobs:
- name: Install Dependencies
uses: ramsey/composer-install@v2
- name: Run PHPStan
run: ./vendor/bin/phpstan analyze src --level 0
run: ./vendor/bin/phpstan analyze src --level 5
test:
name: "Run Tests on PHP ${{ matrix.php }} against RabbitMQ ${{ matrix.rabbitmq }} (Composer: ${{ matrix.composer }}; TLS: ${{ matrix.ssl_test }})"
runs-on: ubuntu-latest
Expand Down
36 changes: 17 additions & 19 deletions spec/generate.php
Original file line number Diff line number Diff line change
@@ -1,14 +1,7 @@
<?php
namespace Bunny;

use Bunny\Exception\ClientException;
use Bunny\Protocol\AbstractFrame;
use Bunny\Protocol\ContentBodyFrame;
use Bunny\Protocol\ContentHeaderFrame;
use Bunny\Protocol\HeartbeatFrame;
use React\EventLoop\Loop;
use React\Promise\Deferred;
use function React\Async\await;

require_once __DIR__ . "/../vendor/autoload.php";

Expand Down Expand Up @@ -336,7 +329,7 @@ function amqpTypeToLength($type, $e)
$connectionContent .= " });\n";
$connectionContent .= " }\n";
$connectionContent .= "\n";
$connectionContent .= " public function disconnect(int \$code, string \$reason)\n";
$connectionContent .= " public function disconnect(int \$code, string \$reason): void\n";
$connectionContent .= " {\n";
$connectionContent .= " \$this->connectionClose(\$code, 0, 0, \$reason);\n";
$connectionContent .= " \$this->connection->close();\n";
Expand All @@ -353,7 +346,7 @@ function amqpTypeToLength($type, $e)
$connectionContent .= " *\n";
$connectionContent .= " * @param AbstractFrame \$frame\n";
$connectionContent .= " */\n";
$connectionContent .= " private function onFrameReceived(AbstractFrame \$frame)\n";
$connectionContent .= " private function onFrameReceived(AbstractFrame \$frame): void\n";
$connectionContent .= " {\n";
$connectionContent .= " if (\$frame instanceof MethodConnectionCloseFrame) {\n";
$connectionContent .= " \$this->disconnect(Constants::STATUS_CONNECTION_FORCED, \"Connection closed by server: ({\$frame->replyCode}) \" . \$frame->replyText);\n";
Expand Down Expand Up @@ -735,8 +728,9 @@ function amqpTypeToLength($type, $e)
$connectionContent .= " \$buffer = \$this->writeBuffer;\n";
if ($class->id === 60 && $method->id === 40) {
$connectionContent .= " \$ck = serialize([\$channel, \$headers, \$exchange, \$routingKey, \$mandatory, \$immediate]);\n";
$connectionContent .= " \$c = isset(\$this->cache[\$ck]) ? \$this->cache[\$ck] : null;\n";
$connectionContent .= " \$flags = 0; \$off0 = 0; \$len0 = 0; \$off1 = 0; \$len1 = 0; \$contentTypeLength = null; \$contentType = null; \$contentEncodingLength = null; \$contentEncoding = null; \$headersBuffer = null; \$deliveryMode = null; \$priority = null; \$correlationIdLength = null; \$correlationId = null; \$replyToLength = null; \$replyTo = null; \$expirationLength = null; \$expiration = null; \$messageIdLength = null; \$messageId = null; \$timestamp = null; \$typeLength = null; \$type = null; \$userIdLength = null; \$userId = null; \$appIdLength = null; \$appId = null; \$clusterIdLength = null; \$clusterId = null;\n";
$connectionContent .= " \$c = \$this->cache[\$ck] ?? null;\n";
$connectionContent .= " \$flags = \$off0 = \$len0 = \$off1 = \$len1 = 0;\n";
$connectionContent .= " \$contentTypeLength = \$contentType = \$contentEncodingLength = \$contentEncoding = \$headersBuffer = \$deliveryMode = \$priority = \$correlationIdLength = \$correlationId = \$replyToLength = \$replyTo = \$expirationLength = \$expiration = \$messageIdLength = \$messageId = \$timestamp = \$typeLength = \$type = \$userIdLength = \$userId = \$appIdLength = \$appId = \$clusterIdLength = \$clusterId = null;\n";
$connectionContent .= " if (\$c) { \$buffer->append(\$c[0]); }\n";
$connectionContent .= " else {\n";
$connectionContent .= " \$off0 = \$buffer->getLength();\n";
Expand Down Expand Up @@ -769,6 +763,7 @@ function amqpTypeToLength($type, $e)

// FIXME: respect max body size agreed upon connection.tune
$connectionContent .= " \$s = 14;\n";
$connectionContent .= "\n";


foreach ([
Expand All @@ -788,9 +783,8 @@ function amqpTypeToLength($type, $e)
] as $flag => $property
) {
list($propertyName, $staticSize, $dynamicSize) = $property;
$connectionContent .= " if (isset(\$headers['{$propertyName}'])) {\n";
$connectionContent .= " if (\$" . lcfirst(dashedToCamel($propertyName)) . " = \$headers['{$propertyName}'] ?? null) {\n";
$connectionContent .= " \$flags |= {$flag};\n";
$connectionContent .= " \$" . lcfirst(dashedToCamel($propertyName)) . " = \$headers['{$propertyName}'];\n";
if ($staticSize) {
$connectionContent .= " \$s += {$staticSize};\n";
}
Expand All @@ -799,6 +793,7 @@ function amqpTypeToLength($type, $e)
}
$connectionContent .= " unset(\$headers['{$propertyName}']);\n";
$connectionContent .= " }\n";
$connectionContent .= "\n";
}

$connectionContent .= " if (!empty(\$headers)) {\n";
Expand Down Expand Up @@ -892,10 +887,13 @@ function amqpTypeToLength($type, $e)
$connectionContent .= " \$deferred = new Deferred();\n";
$connectionContent .= " \$this->awaitList[] = [\n";
$connectionContent .= " 'filter' => function (Protocol\\AbstractFrame \$frame)" . ($class->id !== 10 ? " use (\$channel)" : "") . ": bool {\n";
$connectionContent .= " if (\$frame instanceof Protocol\\{$className}" . ($class->id !== 10 ? " && \$frame->channel === \$channel" : "") . ") {\n";
$connectionContent .= " return true;\n";
$connectionContent .= " }\n";
$connectionContent .= "\n";

if ($class->id !== 10 || $method->id !== 50) {
$connectionContent .= " if (\$frame instanceof Protocol\\{$className}" . ($class->id !== 10 ? " && \$frame->channel === \$channel" : "") . ") {\n";
$connectionContent .= " return true;\n";
$connectionContent .= " }\n";
$connectionContent .= "\n";
}

if ($class->id === 60 && $method->id === 71) {
$connectionContent .= " if (\$frame instanceof Protocol\\" . str_replace("GetOk", "GetEmpty", $className) . ($class->id !== 10 ? " && \$frame->channel === \$channel" : "") . ") {\n";
Expand Down Expand Up @@ -968,7 +966,7 @@ function amqpTypeToLength($type, $e)
$protocolWriterContent .= "}\n";
file_put_contents(__DIR__ . "/../src/Protocol/ProtocolWriterGenerated.php", $protocolWriterContent);

$connectionContent .= " public function startHeathbeatTimer(): void\n";
$connectionContent .= " public function startHeartbeatTimer(): void\n";
$connectionContent .= " {\n";
$connectionContent .= " \$this->heartbeatTimer = Loop::addTimer(\$this->options['heartbeat'], [\$this, 'onHeartbeat']);\n";
$connectionContent .= " \$this->connection->on('drain', [\$this, 'onHeartbeat']);\n";
Expand All @@ -977,7 +975,7 @@ function amqpTypeToLength($type, $e)
$connectionContent .= " /**\n";
$connectionContent .= " * Callback when heartbeat timer timed out.\n";
$connectionContent .= " */\n";
$connectionContent .= " public function onHeartbeat()\n";
$connectionContent .= " public function onHeartbeat(): void\n";
$connectionContent .= " {\n";
$connectionContent .= " \$now = microtime(true);\n";
$connectionContent .= " \$nextHeartbeat = (\$this->lastWrite ?: \$now) + \$this->options['heartbeat'];\n";
Expand Down
2 changes: 1 addition & 1 deletion src/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ public function connect(): self
}
$this->connection->connectionTuneOk($tune->channelMax, $tune->frameMax, (int)$this->options['heartbeat']);
$this->connection->connectionOpen($this->options['vhost']);
$this->connection->startHeathbeatTimer();
$this->connection->startHeartbeatTimer();

$this->state = ClientStateEnum::CONNECTED;
} catch (\Throwable $thrown) {
Expand Down
70 changes: 34 additions & 36 deletions src/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public function __construct(
});
}

public function disconnect(int $code, string $reason)
public function disconnect(int $code, string $reason): void
{
$this->connectionClose($code, 0, 0, $reason);
$this->connection->close();
Expand All @@ -100,7 +100,7 @@ public function disconnect(int $code, string $reason)
*
* @param AbstractFrame $frame
*/
private function onFrameReceived(AbstractFrame $frame)
private function onFrameReceived(AbstractFrame $frame): void
{
if ($frame instanceof MethodConnectionCloseFrame) {
$this->disconnect(Constants::STATUS_CONNECTION_FORCED, "Connection closed by server: ({$frame->replyCode}) " . $frame->replyText);
Expand Down Expand Up @@ -363,10 +363,6 @@ public function awaitConnectionClose(): Protocol\MethodConnectionCloseFrame
$deferred = new Deferred();
$this->awaitList[] = [
'filter' => function (Protocol\AbstractFrame $frame): bool {
if ($frame instanceof Protocol\MethodConnectionCloseFrame) {
return true;
}

if ($frame instanceof Protocol\MethodConnectionCloseFrame) {
$this->connectionCloseOk();
throw new ClientException($frame->replyText, $frame->replyCode);
Expand Down Expand Up @@ -1258,8 +1254,9 @@ public function publish(int $channel, string $body, array $headers = [], string
{
$buffer = $this->writeBuffer;
$ck = serialize([$channel, $headers, $exchange, $routingKey, $mandatory, $immediate]);
$c = isset($this->cache[$ck]) ? $this->cache[$ck] : null;
$flags = 0; $off0 = 0; $len0 = 0; $off1 = 0; $len1 = 0; $contentTypeLength = null; $contentType = null; $contentEncodingLength = null; $contentEncoding = null; $headersBuffer = null; $deliveryMode = null; $priority = null; $correlationIdLength = null; $correlationId = null; $replyToLength = null; $replyTo = null; $expirationLength = null; $expiration = null; $messageIdLength = null; $messageId = null; $timestamp = null; $typeLength = null; $type = null; $userIdLength = null; $userId = null; $appIdLength = null; $appId = null; $clusterIdLength = null; $clusterId = null;
jeromegamez marked this conversation as resolved.
Show resolved Hide resolved
$c = $this->cache[$ck] ?? null;
$flags = $off0 = $len0 = $off1 = $len1 = 0;
$contentTypeLength = $contentType = $contentEncodingLength = $contentEncoding = $headersBuffer = $deliveryMode = $priority = $correlationIdLength = $correlationId = $replyToLength = $replyTo = $expirationLength = $expiration = $messageIdLength = $messageId = $timestamp = $typeLength = $type = $userIdLength = $userId = $appIdLength = $appId = $clusterIdLength = $clusterId = null;
if ($c) { $buffer->append($c[0]); }
else {
$off0 = $buffer->getLength();
Expand All @@ -1274,94 +1271,95 @@ public function publish(int $channel, string $body, array $headers = [], string
$this->writer->appendBits([$mandatory, $immediate], $buffer);
$buffer->appendUint8(206);
$s = 14;
if (isset($headers['content-type'])) {

if ($contentType = $headers['content-type'] ?? null) {
$flags |= 32768;
$contentType = $headers['content-type'];
$s += 1;
$s += $contentTypeLength = strlen($contentType);
unset($headers['content-type']);
}
if (isset($headers['content-encoding'])) {

if ($contentEncoding = $headers['content-encoding'] ?? null) {
$flags |= 16384;
$contentEncoding = $headers['content-encoding'];
$s += 1;
$s += $contentEncodingLength = strlen($contentEncoding);
unset($headers['content-encoding']);
}
if (isset($headers['delivery-mode'])) {

if ($deliveryMode = $headers['delivery-mode'] ?? null) {
$flags |= 4096;
$deliveryMode = $headers['delivery-mode'];
$s += 1;
unset($headers['delivery-mode']);
}
if (isset($headers['priority'])) {

if ($priority = $headers['priority'] ?? null) {
$flags |= 2048;
$priority = $headers['priority'];
$s += 1;
unset($headers['priority']);
}
if (isset($headers['correlation-id'])) {

if ($correlationId = $headers['correlation-id'] ?? null) {
$flags |= 1024;
$correlationId = $headers['correlation-id'];
$s += 1;
$s += $correlationIdLength = strlen($correlationId);
unset($headers['correlation-id']);
}
if (isset($headers['reply-to'])) {

if ($replyTo = $headers['reply-to'] ?? null) {
$flags |= 512;
$replyTo = $headers['reply-to'];
$s += 1;
$s += $replyToLength = strlen($replyTo);
unset($headers['reply-to']);
}
if (isset($headers['expiration'])) {

if ($expiration = $headers['expiration'] ?? null) {
$flags |= 256;
$expiration = $headers['expiration'];
$s += 1;
$s += $expirationLength = strlen($expiration);
unset($headers['expiration']);
}
if (isset($headers['message-id'])) {

if ($messageId = $headers['message-id'] ?? null) {
$flags |= 128;
$messageId = $headers['message-id'];
$s += 1;
$s += $messageIdLength = strlen($messageId);
unset($headers['message-id']);
}
if (isset($headers['timestamp'])) {

if ($timestamp = $headers['timestamp'] ?? null) {
$flags |= 64;
$timestamp = $headers['timestamp'];
$s += 8;
unset($headers['timestamp']);
}
if (isset($headers['type'])) {

if ($type = $headers['type'] ?? null) {
$flags |= 32;
$type = $headers['type'];
$s += 1;
$s += $typeLength = strlen($type);
unset($headers['type']);
}
if (isset($headers['user-id'])) {

if ($userId = $headers['user-id'] ?? null) {
$flags |= 16;
$userId = $headers['user-id'];
$s += 1;
$s += $userIdLength = strlen($userId);
unset($headers['user-id']);
}
if (isset($headers['app-id'])) {

if ($appId = $headers['app-id'] ?? null) {
$flags |= 8;
$appId = $headers['app-id'];
$s += 1;
$s += $appIdLength = strlen($appId);
unset($headers['app-id']);
}
if (isset($headers['cluster-id'])) {

if ($clusterId = $headers['cluster-id'] ?? null) {
$flags |= 4;
$clusterId = $headers['cluster-id'];
$s += 1;
$s += $clusterIdLength = strlen($clusterId);
unset($headers['cluster-id']);
}

if (!empty($headers)) {
$flags |= 8192;
$this->writer->appendTable($headers, $headersBuffer = new Buffer());
Expand Down Expand Up @@ -1849,7 +1847,7 @@ public function awaitConfirmSelectOk(int $channel): Protocol\MethodConfirmSelect
return await($deferred->promise());
}

public function startHeathbeatTimer(): void
public function startHeartbeatTimer(): void
{
$this->heartbeatTimer = Loop::addTimer($this->options['heartbeat'], [$this, 'onHeartbeat']);
$this->connection->on('drain', [$this, 'onHeartbeat']);
Expand All @@ -1858,7 +1856,7 @@ public function startHeathbeatTimer(): void
/**
* Callback when heartbeat timer timed out.
*/
public function onHeartbeat()
public function onHeartbeat(): void
{
$now = microtime(true);
$nextHeartbeat = ($this->lastWrite ?: $now) + $this->options['heartbeat'];
Expand Down
Loading