Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding a memory limit option #225

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 72 additions & 1 deletion src/Pool.php
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ class Pool implements ArrayAccess

protected $maxTaskPayloadInBytes = 100000;

protected $memory_limit = 0;

public function __construct()
{
if (static::isSupported()) {
Expand All @@ -68,6 +70,46 @@ function_exists('pcntl_async_signals')
&& function_exists('proc_open')
&& ! self::$forceSynchronous;
}

/**
* Converts a human-readable size to bytes. Useful to convert memory units from php.ini to bytes.
*
* Examples...
* echo static::bytes('200K'); // 204800
* echo static::bytes('5MiB'); // 5242880
* echo static::bytes('1000'); // 1000
* echo static::bytes('2.5GB'); // 2684354560
*/
public static function bytes(string|int $size = 0):?int
{
$size = trim(strtoupper((string) $size));

// Construct a full list of acceptable byte units...
$byte_units = ['K'=>10, 'M'=>20, 'G'=>30, 'T'=>40, 'E'=>60, 'Z'=>70, 'Y'=>80];
$accepted = ['B'];
foreach(array_keys($byte_units) as $byte_unit)
{
$accepted[] = $byte_unit;
$accepted[] = $byte_unit . 'I';
$accepted[] = $byte_unit . 'B';
$accepted[] = $byte_unit . 'IB';
}

//check if the given unit is accepted...
$pattern = '/^([0-9]+(?:\.[0-9]+)?)('. implode('|', $accepted) .')?$/Di';
if (!preg_match($pattern, $size, $matches))
throw new \Exception('The byte unit size, "'.$size.'", is not recognized.');

//calculate the bytes based on the unit. assume B if no unit specified...
$size = (float) $matches[1];
if (isset($matches[2]) && $matches[2] !== 'B')
{
$unit = substr($matches[2], 0 ,1);
return (int)($size * pow(2, $byte_units[$unit]));
}
else
return (int)$size;
}

public function forceSynchronous(): self
{
Expand Down Expand Up @@ -118,6 +160,34 @@ public function maxTaskPayload(int $maxSizeInBytes): self
return $this;
}

/**
* Defines a memory limit for each asynchronous process in the pool. If a process exceeds this limit, it will crash. Does not affect synchronous processes.
*
* @param int|string|null $memory_limit The memory limit to set for each process (1024, "1024B", "1K", etc). If not supplied, the current memory remaining divided by the concurrency will be used, so as to spread out the memory remaining among all the processes.
* @throws \Exception If no memory limit is provided, and none is currently set
*/
public function memoryLimit(int|string|null $memory_limit = null): self
{
//if memory limit is not supplied, default to the current memory limit - the memory already used...
if ($memory_limit === null)
{
$memory_limit = ini_get('memory_limit');
if ($memory_limit == -1)
$memory_limit = 0;
else
$memory_limit = static::bytes($memory_limit);

if ($memory_limit <= 0)
throw new \Exception('No PHP memory limit is set. You must set one first, or provide one.');

$this->memory_limit = (int)(($memory_limit - memory_get_usage()) / $this->concurrency);
}
else
$this->memory_limit = static::bytes($memory_limit);

return $this;
}

public function notify()
{
if (count($this->inProgress) >= $this->concurrency) {
Expand Down Expand Up @@ -150,7 +220,8 @@ public function add($process, ?int $outputLength = null): Runnable
$process,
$outputLength,
$this->binary,
$this->maxTaskPayloadInBytes
$this->maxTaskPayloadInBytes,
$this->memory_limit
);
}

Expand Down
20 changes: 12 additions & 8 deletions src/Runtime/ParentRuntime.php
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public static function init(string $autoloader = null)
*
* @return \Spatie\Async\Process\Runnable
*/
public static function createProcess($task, ?int $outputLength = null, ?string $binary = 'php', ?int $max_input_size = 100000): Runnable
public static function createProcess($task, ?int $outputLength = null, ?string $binary = 'php', ?int $max_input_size = 100000, ?int $memory_limit = 0): Runnable
{
if (! self::$isInitialised) {
self::init();
Expand All @@ -63,13 +63,17 @@ public static function createProcess($task, ?int $outputLength = null, ?string $
return SynchronousProcess::create($task, self::getId());
}

$process = new Process([
$binary,
self::$childProcessScript,
self::$autoloader,
self::encodeTask($task, $max_input_size),
$outputLength,
]);
$command = [$binary];
if ($memory_limit > 0) {
$command[] = '--define';
$command[] = 'memory_limit='.$memory_limit;
}
$command[] = self::$childProcessScript;
$command[] = self::$autoloader;
$command[] = self::encodeTask($task, $max_input_size);
$command[] = $outputLength;

$process = new Process($command);

return ParallelProcess::create($process, self::getId());
}
Expand Down
Loading