Skip to content
This repository has been archived by the owner on Dec 4, 2023. It is now read-only.

Commit

Permalink
- Added heartbeat callback to the configuration
Browse files Browse the repository at this point in the history
- Small refactoring and speed improvements
  • Loading branch information
bckp committed Jul 20, 2022
1 parent b1f263e commit 39c56d4
Show file tree
Hide file tree
Showing 8 changed files with 22 additions and 56 deletions.
1 change: 1 addition & 0 deletions .docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ rabbitmq:
host: localhost
port: 5672
lazy: false
heartbeatCallback: [@class, function] # Callback that is called every time real heartbeat is send
queues:
testQueue:
Expand Down
5 changes: 2 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,14 @@ For details on how to use this package, check out our [documentation](.docs).

| State | Version | Branch | Nette | PHP |
|--------|-----------|----------|----------|---------|
| dev | `^10.x.x` | `master` | `3.0+` | `>=8.0` |
| stable | `^10.0.0` | `master` | `3.0+` | `>=8.0` |
| dev | `^10.2.x` | `master` | `3.0+` | `>=8.0` |
| stable | `^10.1.0` | `master` | `3.0+` | `>=8.0` |

## Development

This package is currently maintaining by these authors.

<a href="https://github.com/bckp"><img width="80" height="80" src="https://avatars.githubusercontent.com/u/179652?v=4&s=80"></a>
<a href="https://github.com/kucix"><img width="80" height="80" src="https://avatars.githubusercontent.com/u/444459?v=4&s=80"></a>
<a href="https://github.com/mallgroup"><img width="80" height="80" src="https://avatars.githubusercontent.com/u/23184995?v=4&s=80"></a>

-----
Expand Down
20 changes: 10 additions & 10 deletions src/Connection/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,14 @@ class Client extends BunnyClient
*/
public function __construct(array $options = [])
{
// Construct parent
parent::__construct($options);
$this->options['cycle_callback'] = $options['cycle_callback'];

$this->options['cycle_callback'] = is_callable($options['cycle_callback'])
? $options['cycle_callback']
: null;
$this->options['heartbeat_callback'] = is_callable($options['heartbeat_callback'])
? $options['heartbeat_callback']
: null;
}

/**
Expand All @@ -37,9 +42,7 @@ public function sendHeartbeat(): void
$this->getWriter()->appendFrame(new HeartbeatFrame(), $this->writeBuffer);
$this->flushWriteBuffer();

if (is_callable($this->options['heartbeat_callback'] ?? null)) {
$this->options['heartbeat_callback']->call($this);
}
$this->options['heartbeat_callback']?->call($this);
}

public function syncDisconnect(): bool
Expand Down Expand Up @@ -73,7 +76,7 @@ protected function write(): void
{
parent::write();
if (($last = error_get_last()) !== null) {
if (!Strings::match($last['message'], '~fwrite(): Send of \d+ bytes failed with errno=\d+ broken pipe~i')) {
if (!Strings::match($last['message'], '~fwrite\(\): Send of \d+ bytes failed with errno=\d+ broken pipe~i')) {
return;
}

Expand Down Expand Up @@ -101,10 +104,7 @@ public function run($maxSeconds = null): void
}

do {
if (is_callable($this->options['cycle_callback'] ?? null)) {
$this->options['cycle_callback']->call($this);
}

$this->options['cycle_callback']?->call($this);
if (!empty($this->queue)) {
$frame = array_shift($this->queue);
} else {
Expand Down
4 changes: 3 additions & 1 deletion src/Connection/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ public function __construct(
bool $tcpNoDelay,
bool $lazy = false,
?array $ssl = null,
?callable $cycleCallback = null
?callable $cycleCallback = null,
?callable $heartbeatCallback = null,
) {
$this->connectionParams = [
'host' => $host,
Expand All @@ -56,6 +57,7 @@ public function __construct(
'tcp_nodelay' => $tcpNoDelay,
'ssl' => $ssl,
'cycle_callback' => $cycleCallback,
'heartbeat_callback' => $heartbeatCallback,
];

$this->bunnyClient = $this->createNewConnection();
Expand Down
3 changes: 2 additions & 1 deletion src/Connection/ConnectionFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ private function create(string $name): IConnection
$connectionData['tcpNoDelay'],
$connectionData['lazy'],
$connectionData['ssl'],
fn () => $this->sendHeartbeat()
fn () => $this->sendHeartbeat(),
$connectionData['heartbeatCallback'] ?? null,
);
}
}
1 change: 1 addition & 0 deletions src/DI/Helpers/ConnectionsHelper.php
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public function getConfigSchema(): Schema
'tcpNoDelay' => Expect::bool(false),
'lazy' => Expect::bool(true),
'ssl' => Expect::array(null)->required(false),
'heartbeatCallback' => Expect::array(null)->required(false)->assert('is_callable'),
'admin' => Expect::structure([
'port' => Expect::int(15672),
'secure' => Expect::bool(false),
Expand Down
2 changes: 1 addition & 1 deletion src/DI/Helpers/ConsumersHelper.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public function getConfigSchema(): Schema
return Expect::arrayOf(
Expect::structure([
'queue' => Expect::string()->required(true),
'callback' => Expect::array()->required(true),
'callback' => Expect::array()->required(true)->assert('is_callable'),
'idleTimeout' => Expect::int(30),
'bulk' => Expect::structure([
'size' => Expect::int()->min(1),
Expand Down
42 changes: 2 additions & 40 deletions src/Producer/Producer.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@
use Contributte\RabbitMQ\Exchange\IExchange;
use Contributte\RabbitMQ\LazyDeclarator;
use Contributte\RabbitMQ\Queue\IQueue;
use Nette\Utils\Callback;
use Tracy\Dumper;

final class Producer
{
Expand Down Expand Up @@ -39,11 +37,11 @@ public function publish(string $message, array $headers = [], ?string $routingKe
$headers = array_merge($this->getBasicHeaders(), $headers);

if ($this->queue !== null) {
$this->publishToQueue($message, $headers);
$this->tryPublish($this->queue, $message, $headers, '', $this->queue->getName());
}

if ($this->exchange !== null) {
$this->publishToExchange($message, $headers, $routingKey ?? '');
$this->tryPublish($this->exchange, $message, $headers, $this->exchange->getName(), $routingKey ?? '');
}

foreach ($this->publishCallbacks as $callback) {
Expand All @@ -58,18 +56,6 @@ public function addOnPublishCallback(callable $callback): void
}


public function sendHeartbeat(): void
{
trigger_error(__METHOD__ . '() is deprecated, use dependency ConnectionFactory::sendHeartbeat().', E_USER_DEPRECATED);
if ($this->queue !== null) {
$this->queue->getConnection()->sendHeartbeat();
}
if ($this->exchange !== null) {
$this->exchange->getConnection()->sendHeartbeat();
}
}


/**
* @return array<string, string|int>
*/
Expand All @@ -81,30 +67,6 @@ private function getBasicHeaders(): array
];
}

/**
* @param array<string, string|int> $headers
*/
private function publishToQueue(string $message, array $headers = []): void
{
if (null === $queue = $this->queue) {
throw new \UnexpectedValueException('Queue is not defined');
}

$this->tryPublish($queue, $message, $headers, '', $queue->getName());
}


/**
* @param array<string, string|int> $headers
*/
private function publishToExchange(string $message, array $headers, string $routingKey): void
{
if (null === $exchange = $this->exchange) {
throw new \UnexpectedValueException('Exchange is not defined');
}

$this->tryPublish($exchange, $message, $headers, $exchange->getName(), $routingKey);
}

/**
* @param array<string, string|int> $headers
Expand Down

0 comments on commit 39c56d4

Please sign in to comment.