-
-
Notifications
You must be signed in to change notification settings - Fork 195
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
dda41ce
commit 22cdb63
Showing
4 changed files
with
279 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,163 @@ | ||
<?php | ||
|
||
namespace System\Classes; | ||
|
||
use Illuminate\Support\Facades\Cache; | ||
|
||
/** | ||
* Event stream. | ||
* | ||
* Represents a HTML5 event stream that can be interacted with when using the `withEventStream` method in the | ||
* `System\Traits\ResponseMaker` trait. This class is intended to be used with a front-end library that can interpret | ||
* the event stream and manage and display real-time information and updates. | ||
* | ||
* @author Ben Thomson <[email protected]> | ||
* @copyright 2024 Winter CMS Maintainers | ||
*/ | ||
class EventStream | ||
{ | ||
public function __construct( | ||
protected string $id = '', | ||
protected array $data = [], | ||
protected string $event = 'start', | ||
protected bool $closed = false, | ||
protected bool $changed = true, | ||
protected float $ticks = 1.0, | ||
) { | ||
} | ||
|
||
public function register() | ||
{ | ||
// Generate new ID | ||
do { | ||
$this->id = 'event-stream-' . $this->generateId(); | ||
} while (Cache::has($this->id)); | ||
|
||
// Store the stream in the cache | ||
$this->saveEvent(30); | ||
} | ||
|
||
public static function load(string $id): ?static | ||
{ | ||
if (!Cache::has($id)) { | ||
return null; | ||
} | ||
|
||
$data = Cache::get($id); | ||
|
||
return new static( | ||
id: $id, | ||
data: $data['data'], | ||
event: $data['event'], | ||
closed: $data['closed'], | ||
changed: $data['changed'], | ||
ticks: $data['ticks'], | ||
); | ||
} | ||
|
||
public function getId(): string | ||
{ | ||
return $this->id; | ||
} | ||
|
||
public function getTicks(): float | ||
{ | ||
return $this->ticks; | ||
} | ||
|
||
public function set(string|array $key, mixed $value = null): void | ||
{ | ||
if (is_array($key)) { | ||
$this->data = array_merge($this->data, $key); | ||
} else { | ||
$this->data[$key] = $value; | ||
} | ||
|
||
$this->event = 'update'; | ||
$this->changed = true; | ||
$this->saveEvent(); | ||
} | ||
|
||
public function tick(): void | ||
{ | ||
$this->event = 'ping'; | ||
$this->changed = false; | ||
$this->saveEvent(); | ||
} | ||
|
||
public function reconnect(): void | ||
{ | ||
if ($this->closed) { | ||
return; | ||
} | ||
$this->event = 'reconnect'; | ||
$this->closeStream(); | ||
} | ||
|
||
public function close(): void | ||
{ | ||
if ($this->closed) { | ||
return; | ||
} | ||
|
||
$this->event = 'close'; | ||
$this->closeStream(); | ||
} | ||
|
||
public function isClosed(): bool | ||
{ | ||
return $this->closed; | ||
} | ||
|
||
protected function closeStream(): void | ||
{ | ||
$this->closed = true; | ||
$this->saveEvent(); | ||
} | ||
|
||
protected function saveEvent(int $ttl = 5): void | ||
{ | ||
var_dump(Cache::set($this->id, [ | ||
'event' => $this->event, | ||
'data' => $this->data, | ||
'changed' => $this->changed, | ||
'closed' => $this->closed, | ||
'ticks' => $this->ticks, | ||
], now()->addSeconds($ttl))); | ||
} | ||
|
||
protected function generateId(): string | ||
{ | ||
$id = ''; | ||
for ($i = 0; $i < 32; ++$i) { | ||
$id .= substr('0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ', random_int(0, 61), 1); | ||
} | ||
return $id; | ||
} | ||
|
||
public function streamEvent(): string | ||
{ | ||
$data = Cache::get($this->id); | ||
|
||
if ($data['closed']) { | ||
return ''; | ||
} | ||
|
||
$eventData = []; | ||
|
||
if ($data['event'] !== 'ping' || $data['changed']) { | ||
$eventData['time'] = date('c'); | ||
} | ||
if ($data['changed']) { | ||
$eventData['contents'] = $data['data']; | ||
} | ||
|
||
$contents = 'event: ' . $data['event'] . PHP_EOL; | ||
if (count($eventData)) { | ||
$contents .= 'data: ' . json_encode($eventData) . PHP_EOL; | ||
} | ||
$contents .= PHP_EOL; | ||
|
||
return $contents; | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,67 @@ | ||
<?php | ||
|
||
namespace System\Controllers; | ||
|
||
use Backend\Classes\Controller; | ||
use Illuminate\Support\Facades\Response; | ||
use Symfony\Component\HttpFoundation\StreamedResponse; | ||
use System\Classes\EventStream as EventStreamInstance; | ||
|
||
/** | ||
* Event stream controller. | ||
* | ||
* Handles the delivery of event-streaming data to the client. | ||
*/ | ||
class EventStream extends Controller | ||
{ | ||
public function register() | ||
{ | ||
$eventStream = new EventStreamInstance(); | ||
$eventStream->register(); | ||
|
||
return Response::json(['id' => $eventStream->getId()]); | ||
} | ||
|
||
public function subscribe(string $id) | ||
{ | ||
$eventStream = EventStreamInstance::load($id); | ||
|
||
if (is_null($eventStream)) { | ||
return Response::make('Event stream not found', 404); | ||
} | ||
|
||
$response = new StreamedResponse(); | ||
|
||
$response->headers->set('Content-Type', 'text/event-stream'); | ||
$response->headers->set('Cache-Control', 'no-cache'); | ||
$response->headers->set('Connection', 'keep-alive'); | ||
|
||
$response->setCallback(function () use ($eventStream) { | ||
while (true) { | ||
if ($eventStream->isClosed()) { | ||
break; | ||
} | ||
|
||
echo $eventStream; | ||
|
||
if (ob_get_level() > 0) { | ||
ob_flush(); | ||
} | ||
|
||
flush(); | ||
|
||
$eventStream->tick(); | ||
|
||
if (connection_aborted()) { | ||
break; | ||
} | ||
|
||
usleep($eventStream->getTicks() * 1000000); | ||
} | ||
}); | ||
|
||
$response->send(); | ||
$this->responseOverride = $response; | ||
return $response; | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters