Skip to content

Commit

Permalink
Fix: Unsubscribe from transport on closing connection (#12)
Browse files Browse the repository at this point in the history
  • Loading branch information
bpolaszek authored Apr 22, 2022
1 parent e76d458 commit 3cef130
Show file tree
Hide file tree
Showing 10 changed files with 94 additions and 9 deletions.
11 changes: 8 additions & 3 deletions src/Hub/Controller/SubscribeController.php
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,14 @@ function () use ($lastEventId, $stream, $subscribedTopics, $allowedTopics) {

async(
function () use ($stream, $subscribedTopics, $allowedTopics) {
$this->hub->subscribe(function (Update $update) use ($stream, $subscribedTopics, $allowedTopics) {
$this->sendUpdate($update, $stream, $subscribedTopics, $allowedTopics);
});
$callback = fn(Update $update) => $this->sendUpdate(
$update,
$stream,
$subscribedTopics,
$allowedTopics
);
$this->hub->subscribe($callback);
$stream->on('close', fn() => $this->hub->unsubscribe($callback));
}
)();

Expand Down
5 changes: 5 additions & 0 deletions src/Hub/Hub.php
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ public function subscribe(callable $callback): void
$this->transport->subscribe($callback);
}

public function unsubscribe(callable $callback): void
{
$this->transport->unsubscribe($callback);
}

public function reconciliate(string $lastEventID): Generator
{
return $this->transport->reconciliate($lastEventID);
Expand Down
5 changes: 5 additions & 0 deletions src/Hub/Transport/PHP/PHPTransport.php
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ public function subscribe(callable $callback): void
$this->eventEmitter->on('mercureUpdate', $callback);
}

public function unsubscribe(callable $callback): void
{
$this->eventEmitter->removeListener('mercureUpdate', $callback);
}

public function reconciliate(string $lastEventID): Generator
{
$yield = self::EARLIEST === $lastEventID;
Expand Down
16 changes: 13 additions & 3 deletions src/Hub/Transport/Redis/RedisTransport.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
namespace Freddie\Hub\Transport\Redis;

use Clue\React\Redis\Client;
use Evenement\EventEmitter;
use Evenement\EventEmitterInterface;
use Freddie\Hub\Transport\TransportInterface;
use Freddie\Message\Update;
use Generator;
Expand All @@ -24,6 +26,7 @@ public function __construct(
public readonly Client $subscriber,
public readonly Client $redis,
private RedisSerializer $serializer = new RedisSerializer(),
private EventEmitterInterface $eventEmitter = new EventEmitter(),
private int $size = 0,
private float $trimInterval = 0.0,
) {
Expand All @@ -32,9 +35,12 @@ public function __construct(
public function subscribe(callable $callback): void
{
$this->init();
$this->subscriber->on('message', function (string $channel, string $payload) use ($callback) {
$callback($this->serializer->deserialize($payload));
});
$this->eventEmitter->on('mercureUpdate', $callback);
}

public function unsubscribe(callable $callback): void
{
$this->eventEmitter->removeListener('mercureUpdate', $callback);
}

public function publish(Update $update): PromiseInterface
Expand Down Expand Up @@ -84,6 +90,10 @@ private function init(): void
}

$this->subscriber->subscribe($this->channel); // @phpstan-ignore-line
$this->subscriber->on('message', function (string $channel, string $payload) {
$this->eventEmitter->emit('mercureUpdate', [$this->serializer->deserialize($payload)]);
});

if ($this->trimInterval > 0) {
Loop::addPeriodicTimer(
$this->trimInterval,
Expand Down
2 changes: 2 additions & 0 deletions src/Hub/Transport/TransportInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ public function publish(Update $update): PromiseInterface;

public function subscribe(callable $callback): void;

public function unsubscribe(callable $callback): void;

/**
* @param string $lastEventID
* @return Generator<Update>
Expand Down
33 changes: 33 additions & 0 deletions tests/Unit/Hub/Controller/SubscribeControllerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -161,3 +161,36 @@
AccessDeniedHttpException::class,
'Error while decoding from Base64Url, invalid base64 characters detected'
);


it('unsubscribes from transport whenever connection closes', function () {
$transport = new PHPTransport(size: 1000);
$controller = new SubscribeController();
$controller->setHub(new Hub(transport: $transport));
$stream = new ThroughStreamStub();

// Given
$hey = new Message(data: 'Hey!');
$hello = new Message(data: 'Hello');
$world = new Message(data: 'World!');
$transport->publish(new Update(['/bar'], $hey)); // Should not be dumped into stream
$transport->publish(new Update(['/foo'], $hello));
$request = new ServerRequest(
'GET',
'/.well-known/mercure?topic=/foo',
['Last-Event-ID' => 'earliest'],
);

// When
$response = $controller($request, $stream);
Loop::addTimer(0.01, fn () => Loop::stop());
Loop::futureTick(fn () => $stream->close());
Loop::futureTick(fn () => $transport->publish(new Update(['/foo'], $world)));
Loop::run();

// Then
expect($response->getStatusCode())->toBe(200);
expect($response->getHeaderLine('Content-Type'))->toBe('text/event-stream');
expect($stream->storage)->toHaveCount(1);
expect($stream->storage[0])->toBe((string) $hello);
});
2 changes: 1 addition & 1 deletion tests/Unit/Hub/Controller/ThroughStreamStub.php
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,6 @@ public function end($data = null): void

public function close(): void
{
throw new ShouldNotHappen(new LogicException(__METHOD__));
$this->emit('close');
}
}
7 changes: 7 additions & 0 deletions tests/Unit/Hub/HubTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ public function subscribe(callable $callback): void
$this->called['subscribe'] = func_get_args();
}

public function unsubscribe(callable $callback): void
{
$this->called['unsubscribe'] = func_get_args();
}

public function reconciliate(string $lastEventID): Generator
{
$this->called['reconciliate'] = func_get_args();
Expand All @@ -49,11 +54,13 @@ public function reconciliate(string $lastEventID): Generator
$hub->publish($update);
$hub->subscribe($subscribeFn);
iterator_to_array($hub->reconciliate($lastEventId));
$hub->unsubscribe($subscribeFn);

// Then
expect($transport->called['publish'])->toBe([$update]);
expect($transport->called['subscribe'])->toBe([$subscribeFn]);
expect($transport->called['reconciliate'])->toBe([$lastEventId]);
expect($transport->called['unsubscribe'])->toBe([$subscribeFn]);
});

it('complains when requesting an unrecognized option', function () {
Expand Down
14 changes: 13 additions & 1 deletion tests/Unit/Hub/Transport/PHP/PHPTransportTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,20 @@
$transport = new PHPTransport();
$update = new Update(['/foo'], new Message());
$subscriber = (object) ['received' => null];
$transport->subscribe(fn ($receivedUpdate) => $subscriber->received = $receivedUpdate);
$callback = fn($receivedUpdate) => $subscriber->received = $receivedUpdate;
$transport->subscribe($callback);

// When
$transport->publish($update);

// Then
expect($subscriber->received ?? null)->toBe($update);

// When
$transport->unsubscribe($callback);
$transport->publish(new Update(['/foo'], new Message('bar')));

// Then
expect($subscriber->received ?? null)->toBe($update);
});

Expand Down
8 changes: 7 additions & 1 deletion tests/Unit/Hub/Transport/Redis/RedisTransportTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,20 @@
// Given
$subscriber = (object) ['received' => null];
$update = new Update(['/foo'], new Message('bar'));
$callback = fn($receivedUpdate) => $subscriber->received = $receivedUpdate;

// When
$transport->subscribe(fn($receivedUpdate) => $subscriber->received = $receivedUpdate);
$transport->subscribe($callback);
$transport->publish($update);

// Then
expect($subscriber->received ?? null)->not()->toBe($update); // Because serialization/deserialization
expect($subscriber->received ?? null)->toEqual($update);

// When
$transport->unsubscribe($callback);
$transport->publish(new Update(['/foo'], new Message('foobar')));
expect($subscriber->received ?? null)->toEqual($update);
});

it('performs state reconciliation', function () {
Expand Down

0 comments on commit 3cef130

Please sign in to comment.