Skip to content

Commit

Permalink
Add Thread\Pool
Browse files Browse the repository at this point in the history
  • Loading branch information
matyhtf committed Sep 24, 2024
1 parent a60d9c4 commit 92609a0
Show file tree
Hide file tree
Showing 8 changed files with 308 additions and 1 deletion.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@
# Ignore project files.
/Dockerfile.bak
/html/*
/.idea
3 changes: 2 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
},
"require": {
"php": ">=8.1",
"ext-swoole": ">=5.1"
"ext-swoole": ">=5.1",
"nikic/php-parser": "^5.2"
},
"require-dev": {
"ext-sockets": "*",
Expand Down
26 changes: 26 additions & 0 deletions examples/thread/pool.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
<?php
/**
* This file is part of Swoole.
*
* @link https://www.swoole.com
* @contact [email protected]
* @license https://github.com/swoole/library/blob/master/LICENSE
*/

declare(strict_types=1);


use Swoole\Thread\Pool;
use tests\TestThread;

require_once dirname(__DIR__, 2) . '/vendor/autoload.php';
require_once dirname(__DIR__) . '/bootstrap.php';

$map = new Swoole\Thread\Map();

(new Pool(TestThread::class, 4))
->withAutoloader(dirname(__DIR__, 2) . '/vendor/autoload.php')
->withClassDefinitionFile(__DIR__ . '/TestThread.php')
->withArguments([uniqid(), $map])
->start();

3 changes: 3 additions & 0 deletions src/__init__.php
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@
'core/NameResolver/Redis.php',
'core/NameResolver/Nacos.php',
'core/NameResolver/Consul.php',
# <core for Thread> #
'core/Thread/Pool.php',
'core/Thread/Runnable.php',
# <core for functions> #
'core/Coroutine/functions.php',
# <ext> #
Expand Down
183 changes: 183 additions & 0 deletions src/core/Thread/Pool.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
<?php
/**
* This file is part of Swoole.
*
* @link https://www.swoole.com
* @contact [email protected]
* @license https://github.com/swoole/library/blob/master/LICENSE
*/

declare(strict_types=1);

namespace Swoole\Thread;

use PhpParser\Error;
use PhpParser\ParserFactory;
use Swoole\Thread;

class Pool
{
private array $threads = [];
private string $autoloader = '';
private string $classDefinitionFile = '';
private string $runnableClass = '';
private int $threadNum = 0;
private string $proxyFile;
private array $arguments = [];
private object $running;
private object $queue;

public function __construct(string $runnableClass, int $threadNum)
{
if ($threadNum <= 0) {
throw new \Exception("threadNum must be greater than 0");
}
$this->runnableClass = $runnableClass;
$this->threadNum = $threadNum;
}

protected function isValidPhpFile($filePath): bool
{
$allowedNodeTypes = [
\PhpParser\Node\Stmt\Class_::class,
\PhpParser\Node\Stmt\Const_::class,
\PhpParser\Node\Stmt\Use_::class,
\PhpParser\Node\Stmt\Namespace_::class,
\PhpParser\Node\Stmt\Declare_::class,
];

$parser = (new ParserFactory())->createForNewestSupportedVersion();
try {
$code = file_get_contents($filePath);
$stmts = $parser->parse($code);
$skipLine = -1;
foreach ($stmts as $stmt) {
$isAllowed = false;
foreach ($allowedNodeTypes as $allowedNodeType) {
if ($stmt instanceof $allowedNodeType) {
$isAllowed = true;
break;
}
}
if (!$isAllowed) {
if ($stmt->getLine() == $skipLine) {
continue;
}
return false;
}
}
} catch (Error $error) {
return false;
}
return true;
}

public function withArguments(array $arguments): static
{
$this->arguments = $arguments;
return $this;
}

public function withAutoloader(string $autoloader): static
{
$this->autoloader = $autoloader;
return $this;
}

public function withClassDefinitionFile(string $classDefinitionFile): static
{
$this->classDefinitionFile = $classDefinitionFile;
return $this;
}

/**
* @param array $arguments
* @return void
* @throws \ReflectionException
*/
function start(array $arguments = []): void
{
if (empty($this->classDefinitionFile) and class_exists($this->runnableClass, false)) {
$file = (new \ReflectionClass($this->runnableClass))->getFileName();
if (!$this->isValidPhpFile($file)) {
throw new \Exception('class definition file must not contain any expressions.');
}
$this->classDefinitionFile = $file;
} elseif ($this->classDefinitionFile) {
require_once $this->classDefinitionFile;
}

if (!class_exists($this->runnableClass)) {
throw new \Exception("class `$this->runnableClass` not found");
}

if (!is_subclass_of($this->runnableClass, Runnable::class)) {
throw new \Exception("class `$this->runnableClass` must implements Thread\\Runnable");
}

if (empty($this->autoloader)) {
$include_files = get_included_files();
foreach ($include_files as $file) {
if (str_ends_with($file, 'vendor/autoload.php')) {
$this->autoloader = $file;
break;
}
}
}
if (empty($this->autoloader)) {
throw new \Exception('autoload file not found');
}

$this->proxyFile = dirname($this->autoloader) . '/thread_runner.php';
if (!is_file($this->proxyFile)) {
$script = '<?php' . PHP_EOL;
$script .= '$arguments = Swoole\Thread::getArguments();' . PHP_EOL;
$script .= '$threadId = Swoole\Thread::getId();' . PHP_EOL;
$script .= '$autoloader = $arguments[0];' . PHP_EOL;
$script .= '$runnableClass = $arguments[1];' . PHP_EOL;
$script .= '$queue = $arguments[2];' . PHP_EOL;
$script .= '$classDefinitionFile = $arguments[3];' . PHP_EOL;
$script .= '$running = $arguments[4];' . PHP_EOL;
$script .= '$threadArguments = array_slice($arguments, 5);' . PHP_EOL;
$script .= 'require_once $autoloader;' . PHP_EOL;
$script .= 'if ($classDefinitionFile) require_once $classDefinitionFile;' . PHP_EOL;
$script .= '$runnable = new $runnableClass($running);' . PHP_EOL;
$script .= 'try { $runnable->run($threadArguments); }' . PHP_EOL;
$script .= 'finally { $queue->push($threadId, Swoole\Thread\Queue::NOTIFY_ONE); }' . PHP_EOL;
$script .= PHP_EOL;
file_put_contents($this->proxyFile, $script);
}

$this->queue = new Queue;

Check failure on line 151 in src/core/Thread/Pool.php

View workflow job for this annotation

GitHub Actions / static-analysis

Instantiated class Swoole\Thread\Queue not found.
$this->running = new Atomic(1);

Check failure on line 152 in src/core/Thread/Pool.php

View workflow job for this annotation

GitHub Actions / static-analysis

Instantiated class Swoole\Thread\Atomic not found.

for ($i = 0; $i < $this->threadNum; $i++) {
$this->createThread();
}

while ($this->running->get()) {
$threadId = $this->queue->pop(-1);
$thread = $this->threads[$threadId];
$thread->join();
unset($this->threads[$threadId]);
$this->createThread();
}

foreach ($this->threads as $thread) {
$thread->join();
}
}

protected function createThread(): void
{
$thread = new Thread($this->proxyFile,

Check failure on line 173 in src/core/Thread/Pool.php

View workflow job for this annotation

GitHub Actions / static-analysis

Instantiated class Swoole\Thread not found.
$this->autoloader,
$this->runnableClass,
$this->queue,
$this->classDefinitionFile,
$this->running,
...$this->arguments
);
$this->threads[$thread->id] = $thread;

Check failure on line 181 in src/core/Thread/Pool.php

View workflow job for this annotation

GitHub Actions / static-analysis

Access to property $id on an unknown class Swoole\Thread.
}
}
34 changes: 34 additions & 0 deletions src/core/Thread/Runnable.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
<?php
/**
* This file is part of Swoole.
*
* @link https://www.swoole.com
* @contact [email protected]
* @license https://github.com/swoole/library/blob/master/LICENSE
*/

declare(strict_types=1);

namespace Swoole\Thread;

abstract class Runnable
{
protected Atomic $running;

Check failure on line 16 in src/core/Thread/Runnable.php

View workflow job for this annotation

GitHub Actions / static-analysis

Property Swoole\Thread\Runnable::$running has unknown class Swoole\Thread\Atomic as its type.

public function __construct($running)
{
$this->running = $running;
}

abstract public function run(array $args): void;

protected function isRunning(): bool
{
return $this->running->get() === 1;

Check failure on line 27 in src/core/Thread/Runnable.php

View workflow job for this annotation

GitHub Actions / static-analysis

Call to method get() on an unknown class Swoole\Thread\Atomic.
}

protected function shutdown(): void
{
$this->running->set(0);

Check failure on line 32 in src/core/Thread/Runnable.php

View workflow job for this annotation

GitHub Actions / static-analysis

Call to method set() on an unknown class Swoole\Thread\Atomic.
}
}
23 changes: 23 additions & 0 deletions tests/TestThread.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
<?php

namespace Swoole\Tests;

use Swoole\Thread\Runnable;

class TestThread extends Runnable
{
public function run(array $args): void
{
$map = $args[1];
$map->incr('thread', 1);

for ($i = 0; $i < 5; $i++) {
usleep(10000);
$map->incr('sleep');
}

if ($map['sleep'] > 50) {
$this->shutdown();
}
}
}
36 changes: 36 additions & 0 deletions tests/unit/Thread/PoolTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
<?php
/**
* This file is part of Swoole.
*
* @link https://www.swoole.com
* @contact [email protected]
* @license https://github.com/swoole/library/blob/master/LICENSE
*/

declare(strict_types=1);

namespace Swoole\Thread;

use PHPUnit\Framework\Attributes\CoversClass;
use PHPUnit\Framework\TestCase;
use Swoole\Tests\TestThread;

/**
* @internal
*/
#[CoversClass(Pool::class)]
class PoolTest extends TestCase
{
public function testPool(): void
{
$map = new Map();

(new Pool(TestThread::class, 4))
->withClassDefinitionFile(dirname(__DIR__, 2) . '/TestThread.php')
->withArguments([uniqid(), $map])
->start();

$this->assertEquals($map['sleep'], 65);
$this->assertEquals($map['thread'], 13);
}
}

0 comments on commit 92609a0

Please sign in to comment.