Skip to content

Commit

Permalink
Merge pull request #46 from nonanerz/dev
Browse files Browse the repository at this point in the history
Removed abandoned sensio bundle
  • Loading branch information
mcfedr authored Dec 6, 2023
2 parents 562f1a3 + 3daaacd commit e7aa12c
Show file tree
Hide file tree
Showing 46 changed files with 2,014 additions and 2,191 deletions.
1 change: 0 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
"php": ">=8.0",
"ext-json": "*",
"symfony/framework-bundle": "^5.0|^6.0",
"sensio/framework-extra-bundle": "^5.0|^6.0",
"nesbot/carbon": "^1|^2",
"ramsey/uuid": "^3.7|^4.1"
},
Expand Down
3,552 changes: 1,718 additions & 1,834 deletions composer.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion src/Mcfedr/QueueManagerBundle/Command/BeanstalkCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ protected function configure(): void

protected function handleInput(InputInterface $input): void
{
if (($queues = $input->getOption('queue'))) {
if ($queues = $input->getOption('queue')) {
foreach (explode(',', $queues) as $queue) {
$this->pheanstalk->watch($queue);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ protected function getJobs(): ?JobBatch
$em->getConnection()->commit();

if (\count($jobs)) {
return new JobBatch(array_map(function (DoctrineDelayJob $job) {
return new JobBatch(array_map(static function (DoctrineDelayJob $job) {
return new DoctrineDelayWorkerJob($job);
}, $jobs));
}
Expand Down Expand Up @@ -152,7 +152,7 @@ protected function finishJobs(JobBatch $batch): void

protected function handleInput(InputInterface $input): void
{
if (($batch = $input->getOption('batch-size'))) {
if ($batch = $input->getOption('batch-size')) {
$this->batchSize = (int) $batch;
}
$this->reverse = $input->getOption('reverse');
Expand Down
6 changes: 3 additions & 3 deletions src/Mcfedr/QueueManagerBundle/Command/PubSubRunnerCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ protected function finishJobs(JobBatch $batch): void
$topic = $this->pubSub->topic($batch->getOption('topic'));

$topic->publishBatch(
array_map(function (PubSubJob $retryJob) {
array_map(static function (PubSubJob $retryJob) {
$retryJob->incrementRetryCount();

return ['data' => $retryJob->getMessageBody()];
Expand All @@ -69,7 +69,7 @@ protected function finishJobs(JobBatch $batch): void
/** @var PubSubJob[] $toAcknowledge */
$toAcknowledge = array_merge($batch->getOks(), $batch->getRetries(), $batch->getFails());
if (\count($toAcknowledge)) {
$toAcknowledge = array_map(function (PubSubJob $message) {
$toAcknowledge = array_map(static function (PubSubJob $message) {
return $pubSubMessage = (new Message(['messageId' => $message->getId()], ['ackId' => $message->getAckId()]));
}, $toAcknowledge);

Expand All @@ -89,7 +89,7 @@ protected function handleInput(InputInterface $input): void
$this->pubSubQueues = ['default' => $this->defaultQueue];
}

if (($batch = $input->getOption('batch-size'))) {
if ($batch = $input->getOption('batch-size')) {
$this->batchSize = (int) $batch;
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/Mcfedr/QueueManagerBundle/Command/RunnerCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ private function executeBatch(): void
$this->jobs = $this->getJobs();
if ($this->jobs) {
$this->jobExecutor->startBatch($this->jobs);
while (($job = $this->jobs->next())) {
while ($job = $this->jobs->next()) {
$result = $this->executeJob($job);
$this->jobs->result($result);
}
Expand All @@ -174,7 +174,7 @@ private function executeBatchWithProcess(InputInterface $input, OutputInterface
{
$process = $this->getProcess($input);

$process->run(function ($type, $data) use ($output): void {
$process->run(static function ($type, $data) use ($output): void {
$output->write($data);
});
}
Expand Down
18 changes: 9 additions & 9 deletions src/Mcfedr/QueueManagerBundle/Command/SqsRunnerCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ protected function finishJobs(JobBatch $batch): void
$count = 0;
$this->sqs->deleteMessageBatch([
'QueueUrl' => $batch->getOption('url'),
'Entries' => array_map(function (SqsJob $job) use (&$count) {
'Entries' => array_map(static function (SqsJob $job) use (&$count) {
++$count;

return [
Expand All @@ -103,7 +103,7 @@ protected function finishJobs(JobBatch $batch): void
$count = 0;
$this->sqs->changeMessageVisibilityBatch([
'QueueUrl' => $batch->getOption('url'),
'Entries' => array_map(function (SqsJob $job) use (&$count) {
'Entries' => array_map(static function (SqsJob $job) use (&$count) {
++$count;
$job->incrementRetryCount();

Expand All @@ -119,9 +119,9 @@ protected function finishJobs(JobBatch $batch): void

protected function handleInput(InputInterface $input): void
{
if (($url = $input->getOption('url'))) {
if ($url = $input->getOption('url')) {
$this->urls = explode(',', $url);
} elseif (($queue = $input->getOption('queue'))) {
} elseif ($queue = $input->getOption('queue')) {
$this->urls = array_map(function ($queue) {
return $this->queues[$queue];
}, explode(',', $queue));
Expand All @@ -133,15 +133,15 @@ protected function handleInput(InputInterface $input): void
$this->waitTime = 0;
}

if (($timeout = $input->getOption('timeout'))) {
if ($timeout = $input->getOption('timeout')) {
$this->visibilityTimeout = (int) $timeout;
}

if (($batch = $input->getOption('batch-size'))) {
if ($batch = $input->getOption('batch-size')) {
$this->batchSize = (int) $batch;
}

if (($waitTime = $input->getOption('wait-time'))) {
if ($waitTime = $input->getOption('wait-time')) {
$this->waitTime = (int) $waitTime;
}
}
Expand Down Expand Up @@ -195,7 +195,7 @@ private function getJobsFromUrl($url): ?JobBatch
$count = 0;
$this->sqs->deleteMessageBatch([
'QueueUrl' => $url,
'Entries' => array_map(function ($handle) use (&$count) {
'Entries' => array_map(static function ($handle) use (&$count) {
++$count;

return [
Expand All @@ -210,7 +210,7 @@ private function getJobsFromUrl($url): ?JobBatch
$count = 0;
$this->sqs->changeMessageVisibilityBatch([
'QueueUrl' => $url,
'Entries' => array_map(function (SqsJob $job) use (&$count) {
'Entries' => array_map(static function (SqsJob $job) use (&$count) {
++$count;

return [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ private function createManager(ContainerBuilder $container, array $config, strin
break;

case 'doctrine_delay':
if (!isset(($container->getParameter('kernel.bundles'))['DoctrineBundle'])) {
if (!isset($container->getParameter('kernel.bundles')['DoctrineBundle'])) {
throw new \LogicException('"doctrine_delay" requires doctrine/doctrine-bundle to be installed.');
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ private function createMap(ContainerBuilder $container, string $callerId, int $a
foreach ($tags as $attributes) {
if (isset($attributes['id'])) {
$serviceMap[$attributes['id']] = new Reference($id);
} elseif ($method && method_exists(($class = $container->getDefinition($id)->getClass()), $method)) {
} elseif ($method && method_exists($class = $container->getDefinition($id)->getClass(), $method)) {
$serviceMap[$class::$method()] = new Reference($id);
} else {
$serviceMap[$id] = new Reference($id);
Expand Down
54 changes: 15 additions & 39 deletions src/Mcfedr/QueueManagerBundle/Entity/DoctrineDelayJob.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,60 +8,36 @@
use Doctrine\ORM\Mapping as ORM;
use Mcfedr\QueueManagerBundle\Queue\RetryableJob;

/**
* @ORM\Entity
* @ORM\Table(name="DoctrineDelayJob", indexes={
* @ORM\Index(columns={"time"}),
* @ORM\Index(columns={"processing"})
* })
*/
#[ORM\Entity]
#[ORM\Table(name: 'DoctrineDelayJob')]
#[ORM\Index(columns: ['time'])]
#[ORM\Index(columns: ['processing'])]
class DoctrineDelayJob implements RetryableJob
{
/**
* @ORM\Column(name="id", type="integer")
* @ORM\Id
* @ORM\GeneratedValue(strategy="AUTO")
*/
#[ORM\Column(name: 'id', type: 'integer')]
#[ORM\Id]
#[ORM\GeneratedValue(strategy: 'AUTO')]
private ?int $id = null;

/**
* @ORM\Column(type="string", length=255)
*/
#[ORM\Column(type: 'string', length: 255)]
private string $name;

/**
* @ORM\Column(type="json")
*/
#[ORM\Column(type: 'json')]
private array $arguments;

/**
* @ORM\Column(type="json")
*/
#[ORM\Column(type: 'json')]
private array $options;

/**
* @ORM\Column(type="string", length=255, nullable=true)
*/
#[ORM\Column(type: 'string', length: 255, nullable: true)]
private ?string $manager;

/**
* @ORM\Column(name="time", type="datetime")
*/
#[ORM\Column(name: 'time', type: 'datetime')]
private \DateTime $time;

/**
* @ORM\Column(type="datetime")
*/
#[ORM\Column(type: 'datetime')]
private \DateTime $createdAt;

/**
* @ORM\Column(name="processing", type="boolean")
*/
#[ORM\Column(name: 'processing', type: 'boolean')]
private bool $processing = false;

/**
* @ORM\Column(type="integer")
*/
#[ORM\Column(type: 'integer')]
private int $retryCount;

public function __construct(string $name, array $arguments, array $options, ?string $manager, \DateTime $time, int $retryCount = 0)
Expand Down
4 changes: 1 addition & 3 deletions src/Mcfedr/QueueManagerBundle/Event/FinishedJobEvent.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,4 @@

namespace Mcfedr\QueueManagerBundle\Event;

class FinishedJobEvent extends JobEvent
{
}
class FinishedJobEvent extends JobEvent {}
4 changes: 1 addition & 3 deletions src/Mcfedr/QueueManagerBundle/Event/StartJobEvent.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,4 @@

namespace Mcfedr\QueueManagerBundle\Event;

class StartJobEvent extends JobEvent
{
}
class StartJobEvent extends JobEvent {}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,4 @@

namespace Mcfedr\QueueManagerBundle\Exception;

class FailedToForkException extends QueueManagerException
{
}
class FailedToForkException extends QueueManagerException {}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,4 @@

namespace Mcfedr\QueueManagerBundle\Exception;

class InvalidWorkerException extends QueueManagerException implements UnrecoverableJobExceptionInterface
{
}
class InvalidWorkerException extends QueueManagerException implements UnrecoverableJobExceptionInterface {}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,4 @@

namespace Mcfedr\QueueManagerBundle\Exception;

class JobNotDeletableException extends QueueManagerException
{
}
class JobNotDeletableException extends QueueManagerException {}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,4 @@

namespace Mcfedr\QueueManagerBundle\Exception;

class NoSuchJobException extends JobNotDeletableException
{
}
class NoSuchJobException extends JobNotDeletableException {}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,4 @@

namespace Mcfedr\QueueManagerBundle\Exception;

class QueueManagerException extends \Exception
{
}
class QueueManagerException extends \Exception {}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,4 @@

namespace Mcfedr\QueueManagerBundle\Exception;

class UnexpectedJobDataException extends QueueManagerException
{
}
class UnexpectedJobDataException extends QueueManagerException {}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,4 @@

namespace Mcfedr\QueueManagerBundle\Exception;

class UnrecoverableJobException extends QueueManagerException implements UnrecoverableJobExceptionInterface
{
}
class UnrecoverableJobException extends QueueManagerException implements UnrecoverableJobExceptionInterface {}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,4 @@

namespace Mcfedr\QueueManagerBundle\Exception;

interface UnrecoverableJobExceptionInterface extends \Throwable
{
}
interface UnrecoverableJobExceptionInterface extends \Throwable {}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,4 @@

namespace Mcfedr\QueueManagerBundle\Exception;

class WrongJobException extends JobNotDeletableException
{
}
class WrongJobException extends JobNotDeletableException {}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public function put(string $name, array $arguments = [], array $options = []): J

public function delete(Job $job): void
{
if (!($job instanceof BeanstalkJob)) {
if (!$job instanceof BeanstalkJob) {
throw new WrongJobException('Beanstalk manager can only delete beanstalk jobs');
}

Expand Down
4 changes: 1 addition & 3 deletions src/Mcfedr/QueueManagerBundle/Queue/InternalWorker.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,4 @@
*
* @internal
*/
interface InternalWorker extends Worker
{
}
interface InternalWorker extends Worker {}
2 changes: 1 addition & 1 deletion src/Mcfedr/QueueManagerBundle/Runner/JobExecutor.php
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ protected function failedJob(Job $job, \Throwable $exception, bool $internal): v
'retryable' => !$exception instanceof UnrecoverableJobExceptionInterface,
'internal' => $internal,
];
if (($p = $exception->getPrevious())) {
if ($p = $exception->getPrevious()) {
$context['cause'] = $p->getMessage();
}
$this->logger->error('Job failed.', $context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ protected function job($period)
{
$time = new Carbon();

return function () use (&$time, $period) {
return static function () use (&$time, $period) {
Carbon::setTestNow($time);
$time = Carbon::createFromTimestamp(PeriodicWorker::nextRun($period)->getTimestamp());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ protected function job($period)
{
$currentTime = 0;

return function () use (&$currentTime, $period) {
return static function () use (&$currentTime, $period) {
$currentTime += random_int($period, $period * 2);

return $currentTime;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,4 @@

namespace Mcfedr\QueueManagerBundle\Driver;

class TestRetryQueueManager extends TestQueueManager
{
}
class TestRetryQueueManager extends TestQueueManager {}
4 changes: 1 addition & 3 deletions tests/Mcfedr/QueueManagerBundle/Exception/TestException.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,4 @@

namespace Mcfedr\QueueManagerBundle\Exception;

class TestException extends \Exception implements UnrecoverableJobExceptionInterface
{
}
class TestException extends \Exception implements UnrecoverableJobExceptionInterface {}
4 changes: 1 addition & 3 deletions tests/Mcfedr/QueueManagerBundle/Queue/TestJob.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,4 @@

namespace Mcfedr\QueueManagerBundle\Queue;

class TestJob extends AbstractJob
{
}
class TestJob extends AbstractJob {}
Loading

0 comments on commit e7aa12c

Please sign in to comment.