From ba976f72f862149687efe691616347151332bbcc Mon Sep 17 00:00:00 2001 From: ArrayIterator Date: Tue, 17 Oct 2023 12:30:27 +0700 Subject: [PATCH] Bulk updates --- .../ApplicationChecker/TranslatorChecker.php | 7 +- src/Console/Command/SchedulerAction.php | 430 ++++++++++++------ src/Database/Connection.php | 34 -- src/Database/Wrapper/DriverWrapper.php | 9 +- src/Exceptions/Runtime/TimeoutException.php | 8 + .../Adapter/Gettext/PoMoTranslation.php | 1 - src/Lang/default.pot | 159 +++++-- src/Lang/id.po | 196 ++++++-- src/Scheduler/Runner.php | 16 +- src/Scheduler/Scheduler.php | 237 +++++++--- src/Util/Filter/Conversion.php | 51 +++ 11 files changed, 829 insertions(+), 319 deletions(-) create mode 100644 src/Exceptions/Runtime/TimeoutException.php create mode 100644 src/Util/Filter/Conversion.php diff --git a/src/Console/Command/ApplicationChecker/TranslatorChecker.php b/src/Console/Command/ApplicationChecker/TranslatorChecker.php index 91841de..d7c923f 100644 --- a/src/Console/Command/ApplicationChecker/TranslatorChecker.php +++ b/src/Console/Command/ApplicationChecker/TranslatorChecker.php @@ -4,6 +4,7 @@ namespace ArrayAccess\TrayDigita\Console\Command\ApplicationChecker; use ArrayAccess\TrayDigita\Console\Command\Traits\WriterHelperTrait; +use ArrayAccess\TrayDigita\L10n\Languages\Locale; use ArrayAccess\TrayDigita\L10n\Translations\Interfaces\TranslatorInterface; use Symfony\Component\Console\Command\Command; use Symfony\Component\Console\Input\InputInterface; @@ -83,12 +84,14 @@ public function check(InputInterface $input, OutputInterface $output): int ); } + $localeInfo = Locale::getInfo($translator->getLanguage()); $this->writeIndent( $output, sprintf( - '%s [%s]', + '%s [%s]%s', $this->translateContext('Current Language', 'console'), - $translator->getLanguage() + $translator->getLanguage(), + $localeInfo ? sprintf(' (%s)', $localeInfo['name']) : '' ), OutputInterface::VERBOSITY_VERBOSE ); diff --git a/src/Console/Command/SchedulerAction.php b/src/Console/Command/SchedulerAction.php index 24bf233..64d77dc 100644 --- a/src/Console/Command/SchedulerAction.php +++ b/src/Console/Command/SchedulerAction.php @@ -7,19 +7,32 @@ use ArrayAccess\TrayDigita\Container\Interfaces\ContainerAllocatorInterface; use ArrayAccess\TrayDigita\Event\Interfaces\ManagerAllocatorInterface; use ArrayAccess\TrayDigita\Exceptions\Runtime\RuntimeException; +use ArrayAccess\TrayDigita\Kernel\Decorator; use ArrayAccess\TrayDigita\Scheduler\Abstracts\Task; +use ArrayAccess\TrayDigita\Scheduler\Runner; use ArrayAccess\TrayDigita\Scheduler\Scheduler; use ArrayAccess\TrayDigita\Traits\Container\ContainerAllocatorTrait; use ArrayAccess\TrayDigita\Traits\Manager\ManagerAllocatorTrait; use ArrayAccess\TrayDigita\Traits\Service\TranslatorTrait; use ArrayAccess\TrayDigita\Util\Filter\Consolidation; use ArrayAccess\TrayDigita\Util\Filter\ContainerHelper; +use ArrayAccess\TrayDigita\Util\Filter\Conversion; +use DateTime; use Symfony\Component\Console\Command\Command; +use Symfony\Component\Console\Helper\Table; +use Symfony\Component\Console\Helper\TableCell; +use Symfony\Component\Console\Helper\TableCellStyle; +use Symfony\Component\Console\Helper\TableStyle; use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Input\InputOption; use Symfony\Component\Console\Output\OutputInterface; -use function is_int; -use function max; +use Symfony\Component\Console\Question\ConfirmationQuestion; +use Symfony\Component\Console\Style\SymfonyStyle; +use function date; +use function date_default_timezone_get; +use function is_array; +use function round; +use function spl_object_hash; use function sprintf; class SchedulerAction extends Command implements ContainerAllocatorInterface, ManagerAllocatorInterface @@ -62,16 +75,11 @@ protected function execute(InputInterface $input, OutputInterface $output): int if (!Consolidation::isCli()) { return self::INVALID; } - $isRun = $input->getOption('run'); - $isDirect = !$input->isInteractive() || $output->isQuiet(); - if (!$isRun && $isDirect) { - return self::INVALID; - } - $scheduler = ContainerHelper::service( Scheduler::class, $this->getContainer() ); + if (!$scheduler instanceof Scheduler) { throw new RuntimeException( $this->translateContext( @@ -80,143 +88,299 @@ protected function execute(InputInterface $input, OutputInterface $output): int ) ); } - - $queue = []; - $skipped = []; - foreach ($scheduler->getQueue() as $key => $task) { - if ($scheduler->shouldRun($task)) { - $queue[$key] = $task; - } else { - $skipped[$key] = $task; + try { + if (!$input->getOption('run')) { + return $this->listScheduler($scheduler, $output); } + + return $this->runScheduler($scheduler, $input, $output); + } finally { + $this->printUsage($output); } + } - try { - $countQueue = count($queue); - $countSkipped = count($skipped); - if ($isRun) { - if ($countQueue === 0) { - $output->writeln( - sprintf( - '%s', - $this->translateContext('No task in queue', 'console') - ) - ); - return self::SUCCESS; - } - $output->writeln( - sprintf( - '%s', - sprintf( - $this->translatePluralContext( - 'Executing %s task', - 'Executing %s tasks', - $countQueue, - 'console' - ), - $countQueue - ) - ) - ); - $scheduler->run(); + private function runScheduler(Scheduler $scheduler, InputInterface $input, OutputInterface $output): int + { + $interactive = $input->isInteractive() && ! $output->isQuiet(); + $table = new Table($output); + $table->setStyle('box'); + $table->setColumnMaxWidth(0, 40); + $table->setColumnMaxWidth(1, 40); + $table->setHeaders([ + $this->translateContext('Id', 'console'), + $this->translateContext('Name', 'console'), + $this->translateContext('Status', 'console'), + $this->translateContext('Last Execution', 'console'), + $this->translateContext('Execution Time', 'console'), + ]); + $total = 0; + $queue = $scheduler->getQueueProcessed()['queue']; + $wait = $this->translateContext('Waiting', 'console'); + foreach ($queue as $task) { + $total++; + $record = $scheduler->getRecordLoader()->getRecord($task); + $lastExec = $this->translateContext('Unknown', 'console'); + if ($record) { + $lastExec = date('Y-m-d H:i:s', $record->getLastExecutionTime()); + } + $table->addRow([ + $task->getIdentity(), + $task->getName(), + $wait, + $lastExec, + $wait + ]); + } + if ($total === 0) { + $table->addRow([ + new TableCell( + $this->translateContext('No Schedulers Waiting For Execution', 'console'), + [ + 'colspan' => 5, + 'style' => new TableCellStyle([ + 'align' => 'center', + 'options' => 'bold' + ]) + ] + ) + ]); + $table->render(); + return self::SUCCESS; + } + $table->render(); + $io = new SymfonyStyle($input, $output); + if ($interactive) { + $confirm = new ConfirmationQuestion( + $this->translatePluralContext( + 'Are you sure to execute scheduler ?', + 'Are you sure to execute schedulers ?', + $total, + 'console' + ) + ); + $confirm->setMultiline(false); + if (!$io->askQuestion($confirm)) { return self::SUCCESS; } + $c = 0; + while ($c++ < 4) { + $this->clearLine($output); + } + } - $this->write( - $output, - sprintf( - '%s %s', - $countQueue, - $this->translatePluralContext( - 'scheduler in queue to run', - 'schedulers in queue to run', - $countQueue, - 'console' - ) - ), - $countQueue === 0 ? self::MODE_SUCCESS : self::MODE_WARNING - ); - /** @noinspection DuplicatedCode */ - foreach ($queue as $task) { - $interval = $task->getInterval(); - if (is_int($interval)) { - $every = sprintf( - $this->translateContext('run every %s seconds', 'console'), - sprintf( - '%s', - max($interval, Task::MINIMUM_INTERVAL_TIME) - ) - ); - } else { - $every = sprintf( - $this->translateContext('next executed time %s', 'console'), - sprintf( - '[%s]', - $interval->getNextRunDate()->format('Y-m-d H:i:s e') - ) - ); - } - $message = sprintf( - '%s %s', + // clear + do { + $this->clearLine($output); + } while ($total-- > -4); + + $manager = $scheduler->getManager(); + if (!$manager) { + $manager = Decorator::manager(); + $scheduler->setManager($manager); + } + $progressBar = $io->createProgressBar(); + $progressBar->setMaxSteps($total); + $progressBar->setEmptyBarCharacter('░'); // light shade character \u2591 + $progressBar->setProgressCharacter(''); + $progressBar->setBarCharacter('▓'); // dark shade character \u2593 + $progressBar->setFormat('%current% [%bar%] %elapsed:6s% [%message%]'); + $table->setRows([]); + $manager->attach( + 'scheduler.beforeProcessing', + function ($task, Runner $runner) use ($progressBar) { + $progressBar->setMessage($runner->getTask()->getName()); + } + ); + + $manager->attach( + 'scheduler.afterProcessing', + function ($task, Runner $runner, int $time) use (&$queue) { + $task = $runner->getTask(); + $id = spl_object_hash($task); + $status = match ($runner->getStatusCode()) { + Runner::STATUS_SUCCESS => $this->translateContext('Success', 'console'), + Runner::STATUS_EXITED => $this->translateContext('Exited', 'console'), + Runner::STATUS_STOPPED => $this->translateContext('Stopped', 'console'), + Runner::STATUS_FAILURE => $this->translateContext('Failure', 'console'), + default => $this->translateContext('Unknown', 'console') + }; + $total = $runner->getExecutionDuration(); + $queue[$id] = [ + $task->getIdentity(), $task->getName(), - $every - ); - $this->writeIndent( - $output, - $message, - mode: self::MODE_INFO - ); + $status, + date('Y-m-d H:i:s', $time), + $total !== null ? sprintf( + '%s ms', + round($total, 5) + ) : $this->translateContext('Unknown', 'console') + ]; } + ); - $this->write( - $output, - sprintf( - '%s %s', - $countSkipped, - $this->translatePluralContext( - 'scheduler skipped', - 'schedulers skipped', - $countSkipped, - 'console' - ) - ), - self::MODE_SUCCESS - ); - /** @noinspection DuplicatedCode */ - foreach ($skipped as $task) { - $interval = $task->getInterval(); - if (is_int($interval)) { - $every = sprintf( - $this->translateContext('run every %s seconds', 'console'), - sprintf( - '%s', - max($interval, Task::MINIMUM_INTERVAL_TIME) - ) - ); - } else { - $every = sprintf( - $this->translateContext('next executed time %s', 'console'), - sprintf( - '[%s]', - $interval->getNextRunDate()->format('Y-m-d H:i:s e') - ) - ); + $scheduler->run(); + $progressBar->finish(); + $progressBar->clear(); + foreach ($queue as $item) { + if (!is_array($item)) { + $record = $scheduler->getRecordLoader()->getRecord($item); + $lastExec = $this->translateContext('Unknown', 'console'); + if ($record) { + $lastExec = date('Y-m-d H:i:s', $record->getLastExecutionTime()); } + $item = [ + $item->getIdentity(), + $item->getName(), + $this->translateContext('Skipped', 'console'), + $lastExec, + $this->translateContext('Skipped', 'console'), + ]; + } + $table->addRow($item); + } - $message = sprintf( - '%s %s', - $task->getName(), - $every - ); - $this->writeIndent( - $output, - $message, - mode: self::MODE_INFO - ); + $table->render(); + return self::SUCCESS; + } + + private function clearLine(OutputInterface $output): void + { + $output->write("\r\033[K\033[1A\r\033[K\r"); + } + + private function listScheduler(Scheduler $scheduler, OutputInterface $output): int + { + $table = new Table($output); + $table->setStyle('box'); + $table->setColumnMaxWidth(0, 40); + $table->setColumnMaxWidth(1, 40); + $table->setHeaders([ + $this->translateContext('Id', 'console'), + $this->translateContext('Name', 'console'), + $this->translateContext('Status', 'console'), + $this->translateContext('Last Execution', 'console'), + $this->translateContext('Next Run Date', 'console'), + ]); + $total = 0; + $inQueue = 0; + $skippedCount = 0; + $finishedCount = 0; + foreach ($scheduler->getQueue() as $task) { + $total++; + $interval = $scheduler->getNextRunDate($task); + $skipped = ! $scheduler->shouldRun($task); + $skippedCount += $skipped ? 1 : 0; + $inQueue += !$skipped ? 1 : 0; + $record = $scheduler->getRecordLoader()->getRecord($task); + $lastExec = $this->translateContext('Unknown', 'console'); + if ($record) { + $lastExec = date('Y-m-d H:i:s', $record->getLastExecutionTime()); } - return self::SUCCESS; - } finally { - $this->printUsage($output); + $table->addRow([ + $task->getIdentity(), + $task->getName(), + $skipped + ? $this->translateContext('Skipped', 'console') + : $this->translateContext('Need To Execute', 'console'), + $lastExec, + date('Y-m-d H:i:s', $interval?->getTimestamp()) + ]); + } + + foreach ($scheduler->getFinished() as $runner) { + $finishedCount++; + $total++; + $task = $runner->getTask(); + $interval = $scheduler->getNextRunDate($task); + $lastExec = date('Y-m-d H:i:s', $runner->getLastRecord()->getLastExecutionTime()); + $table->addRow([ + $task->getIdentity(), + $task->getName(), + $this->translateContext('Finished', 'console'), + $lastExec, + date('Y-m-d H:i:s', $interval?->getTimestamp()) + ]); + } + + foreach ($scheduler->getSkipped() as $task) { + $skippedCount++; + $total++; + $interval = $scheduler->getNextRunDate($task); + $record = $scheduler->getRecordLoader()->getRecord($task); + $lastExec = $this->translateContext('Unknown', 'console'); + if ($record) { + $lastExec = date('Y-m-d H:i:s', $record->getLastExecutionTime()); + } + $table->addRow([ + $task->getIdentity(), + $task->getName(), + $this->translateContext('Skipped', 'console'), + $lastExec, + date('Y-m-d H:i:s', $interval?->getTimestamp()) + ]); } + if ($total === 0) { + $table->addRow([ + new TableCell( + $this->translateContext('No Schedulers Registered', 'console'), + [ + 'colspan' => 5, + 'style' => new TableCellStyle([ + 'align' => 'center', + 'options' => 'bold' + ]) + ] + ) + ]); + } + $output->writeln( + sprintf( + '%s: %s', + $this->translateContext( + 'Total', + 'console' + ), + $total + ) + ); + $output->writeln( + sprintf( + '%s: %s', + $this->translateContext('Using Timezone', 'console'), + date_default_timezone_get() + ) + ); + $output->writeln( + sprintf( + '%s: %s', + $this->translateContext('Timezone Offset', 'console'), + Conversion::convertOffsetToSQLTimezone((new DateTime())->getOffset()), + ) + ); + $output->writeln( + sprintf( + '%s: %d', + $this->translateContext('Skipped', 'console'), + $skippedCount + ) + ); + $output->writeln( + sprintf( + '%s: %d', + $this->translateContext('Need To Execute', 'console'), + $inQueue + ) + ); + $output->writeln( + sprintf( + '%s: %d', + $this->translateContext('Finished', 'console'), + $finishedCount + ) + ); + + $output->writeln(''); + $table->render(); + return self::SUCCESS; } } diff --git a/src/Database/Connection.php b/src/Database/Connection.php index 0623cc1..7cd2ce2 100644 --- a/src/Database/Connection.php +++ b/src/Database/Connection.php @@ -10,9 +10,6 @@ use ArrayAccess\TrayDigita\Traits\Manager\ManagerDispatcherTrait; use ArrayAccess\TrayDigita\Util\Filter\Consolidation; use ArrayAccess\TrayDigita\Util\Filter\ContainerHelper; -use DateTimeImmutable; -use DateTimeInterface; -use DateTimeZone; use Doctrine\Common\Collections\Selectable; use Doctrine\Common\EventManager; use Doctrine\DBAL\Configuration; @@ -315,37 +312,6 @@ protected function configureDriver(array|Config $params) : Driver } } - public function compareDateToSQLTimezone( - DateTimeInterface $from, - DateTimeInterface $to - ): string { - $seconds = ($from->getTimestamp() - $to->getTimestamp()); - return $this->convertOffsetToSQLTimezone($seconds); - } - - public function convertDateTimeZoneToSQLTimezone(DateTimeZone $timeZone) : string - { - return $this->convertDateToSQLTimezone( - (new DateTimeImmutable())->setTimezone($timeZone) - ); - } - - public function convertDateToSQLTimezone(DateTimeInterface $date) : string - { - return $this->convertOffsetToSQLTimezone($date->getOffset()); - } - - public function convertOffsetToSQLTimezone(int $seconds) : string - { - $hours = floor($seconds / 3600); - $minutes = floor($seconds / 60 % 60); - $hours = $hours < 10 && $hours >= 0 - ? "+0$hours" - : ($hours < 0 && $hours > -10 ? "-0" . (-$hours) : "+$hours"); - $minutes = $minutes < 10 ? "0$minutes" : $minutes; - return "$hours:$minutes"; - } - public function getConnection() : DoctrineConnection { return $this->setUpConnection(); diff --git a/src/Database/Wrapper/DriverWrapper.php b/src/Database/Wrapper/DriverWrapper.php index 1878579..c026460 100644 --- a/src/Database/Wrapper/DriverWrapper.php +++ b/src/Database/Wrapper/DriverWrapper.php @@ -6,6 +6,7 @@ use ArrayAccess\TrayDigita\Database\Connection; use ArrayAccess\TrayDigita\Event\Interfaces\ManagerInterface; use ArrayAccess\TrayDigita\Traits\Service\CallStackTraceTrait; +use ArrayAccess\TrayDigita\Util\Filter\Conversion; use DateTimeZone; use Doctrine\DBAL\Driver; use Doctrine\DBAL\Driver\Connection as DoctrineConnection; @@ -95,9 +96,7 @@ private function initConnection(Driver\Connection $connection): void $charset = $config['charset'] ?? 'utf8'; $timezone = $config['timezone'] ?? '+00:00'; if ($timezone instanceof DateTimeZone) { - $timezone = $this - ->databaseConnection - ->convertDateTimeZoneToSQLTimezone($timezone); + $timezone = Conversion::convertDateTimeZoneToSQLTimezone($timezone); } elseif (is_string($timezone)) { preg_match( '~^([+-])?\s*([0-9]{2})\s*:\s*([0-9]{2})$~', @@ -110,9 +109,7 @@ private function initConnection(Driver\Connection $connection): void } else { try { $timezone = new DateTimeZone($timezone); - $timezone = $this - ->databaseConnection - ->convertDateTimeZoneToSQLTimezone($timezone); + $timezone = Conversion::convertDateTimeZoneToSQLTimezone($timezone); } catch (Throwable) { $timezone = '+00:00'; } diff --git a/src/Exceptions/Runtime/TimeoutException.php b/src/Exceptions/Runtime/TimeoutException.php new file mode 100644 index 0000000..6538a0a --- /dev/null +++ b/src/Exceptions/Runtime/TimeoutException.php @@ -0,0 +1,8 @@ +executedTime; } + public function getExecutionDuration() : ?float + { + $start = $this->getExecuteTime(); + $end = $this->getExecutedTime(); + if ($start === null || $end === null) { + return null; + } + + return ($end - $start); + } + public function getNormalizeStatus() : int { return match ($this->status) { @@ -202,8 +214,10 @@ public function process() : self try { $this->executeTime = $this->createTime(); $message = $this->task->start($this); + $this->status = self::STATUS_SUCCESS; $this->executedTime = $this->createTime(); } catch (Throwable $e) { + $this->status = self::STATUS_FAILURE; $this->executedTime ??= $this->createTime(); $message = new Failure($e); } finally { @@ -227,7 +241,7 @@ public function isProgress(): bool public function isSuccess(): bool { - return $this->getStatusCode() === self::STATUS_PROGRESS; + return $this->getStatusCode() === self::STATUS_SUCCESS; } public function isSkipped(): bool diff --git a/src/Scheduler/Scheduler.php b/src/Scheduler/Scheduler.php index 9b3c74a..6c1cef8 100644 --- a/src/Scheduler/Scheduler.php +++ b/src/Scheduler/Scheduler.php @@ -1,5 +1,5 @@ @@ -65,6 +72,11 @@ class Scheduler implements ContainerAllocatorInterface, ManagerAllocatorInterfac */ private array $skipped = []; + /** + * @var ?array + */ + private ?array $shouldRunning = null; + private ?RecordLoaderInterface $recordLoader = null; public function __construct( @@ -152,7 +164,10 @@ public function createTask( if (!$found) { throw new UnsupportedArgumentException( sprintf( - 'Callback task must be contain return type or instance of: %s', + $this->translateContext( + 'Callback task must be contain return type or instance of: %s', + 'scheduler' + ), MessageInterface::class ) ); @@ -201,7 +216,6 @@ public function start(Runner $runner): MessageInterface ) { $this->{'previousCode'} = $previous->getStatusCode(); } - return $this; })->call( $object, @@ -215,7 +229,12 @@ public function start(Runner $runner): MessageInterface public function add(Task $scheduler): void { - $this->queue[spl_object_hash($scheduler)] = $scheduler; + // reset on add task + $id = spl_object_hash($scheduler); + if (!isset($this->queue[$id])) { + $this->shouldRunning = null; + } + $this->queue[$id] = $scheduler; } /** @@ -239,12 +258,54 @@ public function addCallable( return $task; } + /** + * @param Task $task + * @return ?DateTimeInterface + */ + public function getNextRunDate(Task $task): ?DateTimeInterface + { + $interval = $task->getInterval(); + if ($interval instanceof SchedulerTimeInterface) { + return $interval->getNextRunDate(); + } + if ($interval === 0) { + return null; + } + $time = time(); + $record = $this->getRecordLoader()->getRecord($task); + $interval = max($interval, Task::MINIMUM_INTERVAL_TIME); + $last_time = $record?->getLastExecutionTime()??0; + $last = $time - $last_time; + $nextRun = $last_time + $interval; + $next = $time - $nextRun; + if ($last_time === 0 || $last < 0 || $next > 0 || Runner::MAXIMUM_RUNNING_TIME <= $last_time) { + try { + // if not valid add current time + interval + return new DateTimeImmutable(date('c', $time + $interval)); + } catch (Exception) { + return (new DateTimeImmutable())->modify("+$interval seconds"); + } + } + try { + return new DateTimeImmutable(date('c', $nextRun)); + } catch (Exception) { + $interval = $time - $nextRun; + return (new DateTimeImmutable())->modify("+$interval seconds"); + } + } + public function shouldRun(Task $task) : bool { $interval = $task->getInterval(); + // zero should skipped if ($interval === 0) { return false; } + + if ($this->shouldRunning !== null && isset($this->shouldRunning[spl_object_hash($task)])) { + return true; + } + $record = $this->getRecordLoader()->getRecord($task); $last_time = $record?->getLastExecutionTime()??0; $last_status_code = $record?->getStatusCode()??Runner::STATUS_UNKNOWN; @@ -325,47 +386,101 @@ public function isSkipped(Task $task) : bool return isset($this->skipped[spl_object_hash($task)]); } + public function remove(Task $task): void + { + $id = spl_object_hash($task); + if (isset($this->queue[$id])) { + $this->shouldRunning = null; + unset($this->queue[$id]); + } + } + + /** + * @return array{queue: array, skip: array} + */ + public function getQueueProcessed(): array + { + if ($this->shouldRunning !== null) { + return [ + 'queue' => $this->shouldRunning, + 'skip' => $this->skipped, + ]; + } + + uasort( + $this->queue, + function (Task $a, Task $b) { + $a = $this->getRecordLoader()->getRecord($a)?->getLastExecutionTime()??[0]; + $b = $this->getRecordLoader()->getRecord($b)?->getLastExecutionTime()??[0]; + return $a === $b ? 0 : ($a < $b ? -1 : 1); + } + ); + + $queues = $this->queue; + $this->shouldRunning = []; + while ($queue = array_shift($queues)) { + $id = spl_object_hash($queue); + if ($this->shouldRun($queue)) { + $this->shouldRunning[$id] = $queue; + continue; + } + $this->skipped[$id] = $queue; + } + return [ + 'queue' => $this->shouldRunning, + 'skip' => $this->skipped, + ]; + } + /** + * @param ?int $timeout in seconds. if after processed time greater than time scheduler will stopped * @return int */ - public function run(): int + public function run(?int $timeout = null): int { if (($count = count($this->progress))) { throw new RuntimeException( sprintf( - 'Scheduler is on progress with (%s) %s remaining', - $count, - $count > 1 ? 'tasks' : 'task' + $this->translatePluralContext( + 'Scheduler is on progress with (%s) task remaining', + 'Scheduler is on progress with (%s) tasks remaining', + $count, + 'scheduler' + ), + $count ) ); } + $this->getQueueProcessed(); try { - uasort( - $this->queue, - function (Task $a, Task $b) { - $a = $this->getRecordLoader()->getRecord($a)?->getLastExecutionTime()??[0]; - $b = $this->getRecordLoader()->getRecord($b)?->getLastExecutionTime()??[0]; - return $a === $b ? 0 : ($a < $b ? -1 : 1); - } - ); + $startTime = time(); + $timedOut = is_int($timeout) && $timeout > 0 + ? $startTime + $timeout + : null; $manager = $this->getManager(); $processed = 0; - while ($queue = array_shift($this->queue)) { - $id = spl_object_hash($queue); - $record = $this - ->getRecordLoader() - ->getRecord($queue); - - // create current time - $time = time(); - // add in progress + if ($this->shouldRunning === null) { + $this->getQueueProcessed(); + $this->shouldRunning ??= []; + } + foreach ($this->shouldRunning as $id => $queue) { + unset($this->queue[$id], $this->shouldRunning[$id]); $this->progress[$id] = $queue; - $record ??= (new LastRecord( + } + // reset + $this->shouldRunning = null; + foreach ($this->progress as $id => $queue) { + $time = time(); + if ($timedOut !== null && $time > $timedOut) { + break; + } + // create current time + $record = $this->getRecordLoader()->getRecord($queue)??new LastRecord( $queue, $time, null - )); + ); // increment $processed++; // new runner @@ -375,37 +490,10 @@ function (Task $a, Task $b) { $record, $time ); - - if (!$this->shouldRun($queue)) { - $this->skipped[$id] = $queue; - $manager?->dispatch( - 'scheduler.beforeSkipTask', - $runner, - $time, - $this - ); - try { - $runner->skip(); - $manager?->dispatch( - 'scheduler.skipTask', - $runner, - $time, - $this - ); - } finally { - $manager?->dispatch( - 'scheduler.afterSkipTask', - $runner, - $time, - $this - ); - } - continue; - } - $ended = false; $manager?->dispatch( - 'scheduler.beforeProcessTask', + 'scheduler.beforeProcessing', + $queue, $runner, $time, $this @@ -429,7 +517,6 @@ function () use ( $this->{'status'} = Runner::STATUS_EXITED; })->call($runner); } - unset($this->progress[$id]); $this->finished[$id] = $runner; $this->getRecordLoader()->storeExitRunner( @@ -437,42 +524,50 @@ function () use ( $this ); $manager?->dispatch( - 'scheduler.afterProcessTask', + 'scheduler.afterProcessing', + $queue, + $runner, + $time, + $this + ); + $manager?->dispatch( + 'scheduler.exiting', + $queue, $runner, $time, $this ); } ); + try { // do process $runner->process(); - $manager?->dispatch( - 'scheduler.processTask', - $runner, - $time, - $this - ); + $ended = true; + $manager?->dispatch('scheduler.processing', $queue, $runner, $time, $this); } finally { $ended = true; // add records execution time & status - $this->getRecordLoader()->finish( - $time, - $runner, - $this - ); + $this->getRecordLoader()->finish($time, $runner, $this); // done $this->finished[$id] = $runner; // remove progress unset($this->progress[$id]); $manager?->dispatch( - 'scheduler.afterProcessTask', + 'scheduler.afterProcessing', + $queue, $runner, $time, $this ); } } + + // put back if time out + foreach ($this->progress as $id => $queue) { + unset($this->progress[$id]); + $this->queue[$id] = $queue; + } } finally { return $processed; } diff --git a/src/Util/Filter/Conversion.php b/src/Util/Filter/Conversion.php new file mode 100644 index 0000000..8e05955 --- /dev/null +++ b/src/Util/Filter/Conversion.php @@ -0,0 +1,51 @@ +getTimestamp() - $to->getTimestamp()); + return self::convertOffsetToSQLTimezone($seconds); + } + + public static function convertDateTimeZoneToSQLTimezone(DateTimeZone $timeZone) : string + { + return self::convertDateToSQLTimezone( + (new DateTimeImmutable())->setTimezone($timeZone) + ); + } + + public static function convertDateToSQLTimezone(DateTimeInterface $date) : string + { + return self::convertOffsetToSQLTimezone($date->getOffset()); + } + + public static function convertOffsetToSQLTimezone(int $seconds) : string + { + $hours = floor($seconds / 3600); + $minutes = floor($seconds / 60 % 60); + $hours = $hours < 10 && $hours >= 0 + ? "+0$hours" + : ($hours < 0 && $hours > -10 ? "-0" . (-$hours) : "+$hours"); + $minutes = $minutes < 10 ? "0$minutes" : $minutes; + return "$hours:$minutes"; + } +}