diff --git a/src/Channel.php b/src/Channel.php index 754ba8d..c395a3f 100644 --- a/src/Channel.php +++ b/src/Channel.php @@ -81,11 +81,9 @@ class Channel implements ChannelInterface, EventEmitterInterface /** @var Buffer */ private $bodyBuffer; - /** @var int */ - private $state = ChannelStateEnum::READY; + private ChannelState $state = ChannelState::Ready; - /** @var int */ - private $mode = ChannelModeEnum::REGULAR; + private ChannelMode $mode = ChannelMode::Regular; /** @var Deferred */ private $closeDeferred; @@ -119,10 +117,8 @@ public function getChannelId(): int /** * Returns the channel mode. - * - * @return int */ - public function getMode(): int + public function getMode(): ChannelMode { return $this->mode; } @@ -165,7 +161,7 @@ public function removeReturnListener(callable $callback) */ public function addAckListener(callable $callback) { - if ($this->mode !== ChannelModeEnum::CONFIRM) { + if ($this->mode !== ChannelMode::Confirm) { throw new ChannelException("Ack/nack listener can be added when channel in confirm mode."); } @@ -182,7 +178,7 @@ public function addAckListener(callable $callback) */ public function removeAckListener(callable $callback) { - if ($this->mode !== ChannelModeEnum::CONFIRM) { + if ($this->mode !== ChannelMode::Confirm) { throw new ChannelException("Ack/nack listener can be removed when channel in confirm mode."); } @@ -202,16 +198,16 @@ public function removeAckListener(callable $callback) */ public function close(int $replyCode = 0, string $replyText = ""): void { - if ($this->state === ChannelStateEnum::CLOSED) { + if ($this->state === ChannelState::Closed) { throw new ChannelException("Trying to close already closed channel #{$this->channelId}."); } - if ($this->state === ChannelStateEnum::CLOSING) { + if ($this->state === ChannelState::Closing) { await($this->closePromise); return; } - $this->state = ChannelStateEnum::CLOSING; + $this->state = ChannelState::Closing; $this->connection->channelClose($this->channelId, $replyCode, 0, 0, $replyText); $this->closeDeferred = new Deferred(); @@ -279,12 +275,12 @@ public function get(string $queue = "", bool $noAck = false): Message|null return null; } elseif ($response instanceof MethodBasicGetOkFrame) { - $this->state = ChannelStateEnum::AWAITING_HEADER; + $this->state = ChannelState::AwaitingHeader; $headerFrame = $this->connection->awaitContentHeader($this->getChannelId()); $this->headerFrame = $headerFrame; $this->bodySizeRemaining = $headerFrame->bodySize; - $this->state = ChannelStateEnum::AWAITING_BODY; + $this->state = ChannelState::AwaitingBody; while ($this->bodySizeRemaining > 0) { $bodyFrame = $this->connection->awaitContentBody($this->getChannelId()); @@ -293,13 +289,13 @@ public function get(string $queue = "", bool $noAck = false): Message|null $this->bodySizeRemaining -= $bodyFrame->payloadSize; if ($this->bodySizeRemaining < 0) { - $this->state = ChannelStateEnum::ERROR; + $this->state = ChannelState::Error; $this->connection->disconnect(Constants::STATUS_SYNTAX_ERROR, $errorMessage = "Body overflow, received " . (-$this->bodySizeRemaining) . " more bytes."); throw new ChannelException($errorMessage); } } - $this->state = ChannelStateEnum::READY; + $this->state = ChannelState::Ready; $message = new Message( null, @@ -327,7 +323,7 @@ public function publish($body, array $headers = [], string $exchange = '', strin { $response = $this->publishImpl($body, $headers, $exchange, $routingKey, $mandatory, $immediate); - if ($this->mode === ChannelModeEnum::CONFIRM) { + if ($this->mode === ChannelMode::Confirm) { return ++$this->deliveryTag; } else { return $response; @@ -349,12 +345,12 @@ public function cancel(string $consumerTag, bool $nowait = false): bool|\Bunny\P */ public function txSelect(): \Bunny\Protocol\MethodTxSelectOkFrame { - if ($this->mode !== ChannelModeEnum::REGULAR) { + if ($this->mode !== ChannelMode::Regular) { throw new ChannelException("Channel not in regular mode, cannot change to transactional mode."); } $response = $this->txSelectImpl(); - $this->mode = ChannelModeEnum::TRANSACTIONAL; + $this->mode = ChannelMode::Transactional; return $response; } @@ -364,7 +360,7 @@ public function txSelect(): \Bunny\Protocol\MethodTxSelectOkFrame */ public function txCommit(): \Bunny\Protocol\MethodTxCommitOkFrame { - if ($this->mode !== ChannelModeEnum::TRANSACTIONAL) { + if ($this->mode !== ChannelMode::Transactional) { throw new ChannelException("Channel not in transactional mode, cannot call 'tx.commit'."); } @@ -376,7 +372,7 @@ public function txCommit(): \Bunny\Protocol\MethodTxCommitOkFrame */ public function txRollback(): \Bunny\Protocol\MethodTxRollbackOkFrame { - if ($this->mode !== ChannelModeEnum::TRANSACTIONAL) { + if ($this->mode !== ChannelMode::Transactional) { throw new ChannelException("Channel not in transactional mode, cannot call 'tx.rollback'."); } @@ -388,7 +384,7 @@ public function txRollback(): \Bunny\Protocol\MethodTxRollbackOkFrame */ public function confirmSelect(callable $callback = null, bool $nowait = false): \Bunny\Protocol\MethodConfirmSelectOkFrame { - if ($this->mode !== ChannelModeEnum::REGULAR) { + if ($this->mode !== ChannelMode::Regular) { throw new ChannelException("Channel not in regular mode, cannot change to transactional mode."); } @@ -400,7 +396,7 @@ public function confirmSelect(callable $callback = null, bool $nowait = false): private function enterConfirmMode(callable $callback = null): void { - $this->mode = ChannelModeEnum::CONFIRM; + $this->mode = ChannelMode::Confirm; $this->deliveryTag = 0; if ($callback) { @@ -415,26 +411,26 @@ private function enterConfirmMode(callable $callback = null): void */ public function onFrameReceived(AbstractFrame $frame): void { - if ($this->state === ChannelStateEnum::ERROR) { + if ($this->state === ChannelState::Error) { throw new ChannelException("Channel in error state."); } - if ($this->state === ChannelStateEnum::CLOSED) { + if ($this->state === ChannelState::Closed) { throw new ChannelException("Received frame #{$frame->type} on closed channel #{$this->channelId}."); } if ($frame instanceof MethodFrame) { - if ($this->state === ChannelStateEnum::CLOSING && !($frame instanceof MethodChannelCloseOkFrame)) { + if ($this->state === ChannelState::Closing && !($frame instanceof MethodChannelCloseOkFrame)) { // drop frames in closing state return; - } elseif ($this->state !== ChannelStateEnum::READY && !($frame instanceof MethodChannelCloseOkFrame)) { + } elseif ($this->state !== ChannelState::Ready && !($frame instanceof MethodChannelCloseOkFrame)) { $currentState = $this->state; - $this->state = ChannelStateEnum::ERROR; + $this->state = ChannelState::Error; - if ($currentState === ChannelStateEnum::AWAITING_HEADER) { + if ($currentState === ChannelState::AwaitingHeader) { $msg = "Got method frame, expected header frame."; - } elseif ($currentState === ChannelStateEnum::AWAITING_BODY) { + } elseif ($currentState === ChannelState::AwaitingBody) { $msg = "Got method frame, expected body frame."; } else { throw new \LogicException("Unhandled channel state."); @@ -446,7 +442,7 @@ public function onFrameReceived(AbstractFrame $frame): void } if ($frame instanceof MethodChannelCloseOkFrame) { - $this->state = ChannelStateEnum::CLOSED; + $this->state = ChannelState::Closed; if ($this->closeDeferred !== null) { $this->closeDeferred->resolve($this->channelId); @@ -459,11 +455,11 @@ public function onFrameReceived(AbstractFrame $frame): void } elseif ($frame instanceof MethodBasicReturnFrame) { $this->returnFrame = $frame; - $this->state = ChannelStateEnum::AWAITING_HEADER; + $this->state = ChannelState::AwaitingHeader; } elseif ($frame instanceof MethodBasicDeliverFrame) { $this->deliverFrame = $frame; - $this->state = ChannelStateEnum::AWAITING_HEADER; + $this->state = ChannelState::AwaitingHeader; } elseif ($frame instanceof MethodBasicAckFrame) { foreach ($this->ackCallbacks as $callback) { @@ -482,17 +478,17 @@ public function onFrameReceived(AbstractFrame $frame): void } } elseif ($frame instanceof ContentHeaderFrame) { - if ($this->state === ChannelStateEnum::CLOSING) { + if ($this->state === ChannelState::Closing) { // drop frames in closing state return; - } elseif ($this->state !== ChannelStateEnum::AWAITING_HEADER) { + } elseif ($this->state !== ChannelState::AwaitingHeader) { $currentState = $this->state; - $this->state = ChannelStateEnum::ERROR; + $this->state = ChannelState::Error; - if ($currentState === ChannelStateEnum::READY) { + if ($currentState === ChannelState::Ready) { $msg = "Got header frame, expected method frame."; - } elseif ($currentState === ChannelStateEnum::AWAITING_BODY) { + } elseif ($currentState === ChannelState::AwaitingBody) { $msg = "Got header frame, expected content frame."; } else { throw new \LogicException("Unhandled channel state."); @@ -507,24 +503,24 @@ public function onFrameReceived(AbstractFrame $frame): void $this->bodySizeRemaining = $frame->bodySize; if ($this->bodySizeRemaining > 0) { - $this->state = ChannelStateEnum::AWAITING_BODY; + $this->state = ChannelState::AwaitingBody; } else { - $this->state = ChannelStateEnum::READY; + $this->state = ChannelState::Ready; $this->onBodyComplete(); } } elseif ($frame instanceof ContentBodyFrame) { - if ($this->state === ChannelStateEnum::CLOSING) { + if ($this->state === ChannelState::Closing) { // drop frames in closing state return; - } elseif ($this->state !== ChannelStateEnum::AWAITING_BODY) { + } elseif ($this->state !== ChannelState::AwaitingBody) { $currentState = $this->state; - $this->state = ChannelStateEnum::ERROR; + $this->state = ChannelState::Error; - if ($currentState === ChannelStateEnum::READY) { + if ($currentState === ChannelState::Ready) { $msg = "Got body frame, expected method frame."; - } elseif ($currentState === ChannelStateEnum::AWAITING_HEADER) { + } elseif ($currentState === ChannelState::AwaitingHeader) { $msg = "Got body frame, expected header frame."; } else { throw new \LogicException("Unhandled channel state."); @@ -539,11 +535,11 @@ public function onFrameReceived(AbstractFrame $frame): void $this->bodySizeRemaining -= $frame->payloadSize; if ($this->bodySizeRemaining < 0) { - $this->state = ChannelStateEnum::ERROR; + $this->state = ChannelState::Error; $this->connection->disconnect(Constants::STATUS_SYNTAX_ERROR, "Body overflow, received " . (-$this->bodySizeRemaining) . " more bytes."); } elseif ($this->bodySizeRemaining === 0) { - $this->state = ChannelStateEnum::READY; + $this->state = ChannelState::Ready; $this->onBodyComplete(); } diff --git a/src/ChannelInterface.php b/src/ChannelInterface.php index 3e65b22..4333a2e 100644 --- a/src/ChannelInterface.php +++ b/src/ChannelInterface.php @@ -19,10 +19,9 @@ interface ChannelInterface { /** * Returns the channel mode. - * - * @return int */ - public function getMode(): int; + public function getMode(): ChannelMode; + /** * Listener is called whenever 'basic.return' frame is received with arguments (Message $returnedMessage, MethodBasicReturnFrame $frame) * diff --git a/src/ChannelModeEnum.php b/src/ChannelMode.php similarity index 79% rename from src/ChannelModeEnum.php rename to src/ChannelMode.php index e8f0a0e..1986c79 100644 --- a/src/ChannelModeEnum.php +++ b/src/ChannelMode.php @@ -9,24 +9,21 @@ * * @author Jakub Kulhan */ -final class ChannelModeEnum +enum ChannelMode { /** * Regular AMQP guarantees of published messages delivery. */ - const REGULAR = 1; + case Regular; /** * Messages are published after 'tx.commit'. */ - const TRANSACTIONAL = 2; + case Transactional; /** * Broker sends asynchronously 'basic.ack's for delivered messages. */ - const CONFIRM = 3; - - - + case Confirm; } diff --git a/src/ChannelStateEnum.php b/src/ChannelState.php similarity index 76% rename from src/ChannelStateEnum.php rename to src/ChannelState.php index e968791..2b3c903 100644 --- a/src/ChannelStateEnum.php +++ b/src/ChannelState.php @@ -9,37 +9,35 @@ * * @author Jakub Kulhan */ -final class ChannelStateEnum +enum ChannelState { - /** * Channel is ready to receive messages. */ - const READY = 1; + case Ready; /** * Channel got method that is followed by header/content frames and now waits for header frame. */ - const AWAITING_HEADER = 2; + case AwaitingHeader; /** * Channel got method and header frame and now waits for body frame. */ - const AWAITING_BODY = 3; + case AwaitingBody; /** * An error occurred on channel. */ - const ERROR = 4; + case Error; /** * Channel is being closed. */ - const CLOSING = 5; + case Closing; /** * Channel has received channel.close-ok frame. */ - const CLOSED = 6; - + case Closed; } diff --git a/src/Client.php b/src/Client.php index a74e770..66f1974 100644 --- a/src/Client.php +++ b/src/Client.php @@ -42,9 +42,8 @@ class Client implements ClientInterface private readonly array $options; private readonly Connector $connector; - - /** @var int */ - private int $state = ClientStateEnum::NOT_CONNECTED; + + private ClientState $state = ClientState::NotConnected; private ?Connection $connection = null; @@ -134,7 +133,7 @@ public function __construct(array $options = []) $this->connector = new Connector($this->options); - $this->state = ClientStateEnum::NOT_CONNECTED; + $this->state = ClientState::NotConnected; $this->channels = new Channels(); } @@ -163,7 +162,7 @@ public function channel(): ChannelInterface return $channel; } - $this->state = ClientStateEnum::ERROR; + $this->state = ClientState::Error; throw new ClientException( 'channel.open unexpected response of type ' . gettype($response) . '.' @@ -177,11 +176,11 @@ public function channel(): ChannelInterface */ public function connect(): self { - if ($this->state !== ClientStateEnum::NOT_CONNECTED) { + if ($this->state !== ClientState::NotConnected) { throw new ClientException('Client already connected/connecting.'); } - $this->state = ClientStateEnum::CONNECTING; + $this->state = ClientState::Connecting; $streamScheme = 'tcp'; if (isset($this->options['tls']) && is_array($this->options['tls'])) { @@ -213,7 +212,7 @@ public function connect(): self $this->connection->connectionOpen($this->options['vhost']); $this->connection->startHeathbeatTimer(); - $this->state = ClientStateEnum::CONNECTED; + $this->state = ClientState::Connected; } catch (\Throwable $thrown) { throw new ClientException('Could not connect to ' . $uri . ': ' . $thrown->getMessage(), $thrown->getCode(), $thrown); } @@ -247,15 +246,15 @@ protected function authResponse(MethodConnectionStartFrame $start): void */ public function disconnect(int $replyCode = 0, string $replyText = ''): void { - if ($this->state === ClientStateEnum::DISCONNECTING) { + if ($this->state === ClientState::Disconnecting) { return; } - if ($this->state !== ClientStateEnum::CONNECTED) { + if ($this->state !== ClientState::Connected) { throw new ClientException('Client is not connected.'); } - $this->state = ClientStateEnum::DISCONNECTING; + $this->state = ClientState::Disconnecting; $promises = []; foreach ($this->channels->all() as $channelId => $channel) { @@ -267,7 +266,7 @@ public function disconnect(int $replyCode = 0, string $replyText = ''): void $this->connection->disconnect($replyCode, $replyText); - $this->state = ClientStateEnum::NOT_CONNECTED; + $this->state = ClientState::NotConnected; } /** @@ -275,7 +274,7 @@ public function disconnect(int $replyCode = 0, string $replyText = ''): void */ public function isConnected(): bool { - return $this->state !== ClientStateEnum::NOT_CONNECTED && $this->state !== ClientStateEnum::ERROR; + return $this->state !== ClientState::NotConnected && $this->state !== ClientState::Error; } /** diff --git a/src/ClientStateEnum.php b/src/ClientState.php similarity index 75% rename from src/ClientStateEnum.php rename to src/ClientState.php index 0686505..764b612 100644 --- a/src/ClientStateEnum.php +++ b/src/ClientState.php @@ -9,32 +9,32 @@ * * @author Jakub Kulhan */ -final class ClientStateEnum +enum ClientState { /** * Client is not connected. Method connect() hasn't been called yet. */ - const NOT_CONNECTED = 0; + case NotConnected; /** * Client is currently connecting to AMQP server. */ - const CONNECTING = 1; + case Connecting; /** * Client is connected and ready to communicate. */ - const CONNECTED = 2; + case Connected; /** * Client is currently disconnecting from AMQP server. */ - const DISCONNECTING = 3; + case Disconnecting; /** * An error has occurred. */ - const ERROR = 4; + case Error; }