-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
12 changed files
with
166 additions
and
114 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,85 +1,150 @@ | ||
<?php | ||
|
||
declare(strict_types=1); | ||
|
||
namespace Sbooker\CommandBus\Infrastructure\Persistence; | ||
|
||
use Doctrine\DBAL\LockMode; | ||
use Doctrine\DBAL\Types\Types; | ||
use Doctrine\DBAL\Connection; | ||
use Doctrine\DBAL\Exception as DBALException; | ||
use Doctrine\DBAL\ParameterType; | ||
use Doctrine\DBAL\Platforms\AbstractPlatform; | ||
use Doctrine\DBAL\Query\Expression\ExpressionBuilder; | ||
use Doctrine\DBAL\Query\ForUpdate\ConflictResolutionMode; | ||
use Doctrine\DBAL\Query\QueryBuilder; | ||
use Doctrine\ORM\EntityRepository; | ||
use Doctrine\ORM\QueryBuilder; | ||
use Doctrine\ORM\Exception\ORMException; | ||
use Doctrine\ORM\NonUniqueResultException; | ||
use Doctrine\ORM\Query\ResultSetMappingBuilder; | ||
use Doctrine\Persistence\Mapping\MappingException; | ||
use Ramsey\Uuid\UuidInterface; | ||
use Sbooker\CommandBus\Command; | ||
use Sbooker\CommandBus\ReadStorage; | ||
use Sbooker\CommandBus\Status; | ||
use Sbooker\CommandBus\WriteStorage; | ||
|
||
class DoctrineRepository extends EntityRepository implements WriteStorage, ReadStorage | ||
final class DoctrineRepository extends EntityRepository implements WriteStorage, ReadStorage | ||
{ | ||
public function get(UuidInterface $id): ?Command | ||
{ | ||
return $this->find($id); | ||
} | ||
|
||
/** | ||
* @throws \Doctrine\ORM\NonUniqueResultException | ||
* @throws \Doctrine\ORM\TransactionRequiredException | ||
* @throws ORMException | ||
* @throws \ReflectionException | ||
* @throws MappingException | ||
*/ | ||
public function getAndLock(array $names, UuidInterface $id): ?Command | ||
{ | ||
return | ||
$this->createQueryBuilderWithNamesCondition('t', $names) | ||
->andWhere('t.id = :id') | ||
->setParameter('id', $id) | ||
->getQuery() | ||
->setLockMode(LockMode::PESSIMISTIC_WRITE) | ||
->getOneOrNullResult() | ||
; | ||
$alias = 'c'; | ||
$qb = $this->createDbalQueryBuilderWithCommonExpression($alias, $names); | ||
$sql = $qb | ||
->andWhere('c.id = :id') | ||
->getSQL(); | ||
|
||
return $this->findCommandBySQL($alias, $sql, ['id' => $id->toString()]); | ||
} | ||
|
||
/** | ||
* @throws \Doctrine\ORM\NonUniqueResultException | ||
* @throws ORMException | ||
* @throws \ReflectionException | ||
* @throws MappingException|DBALException | ||
*/ | ||
public function get(UuidInterface $id): ?Command | ||
public function getFirstToProcessAndLock(array $names): ?Command | ||
{ | ||
return $this->find($id); | ||
$alias = 'c'; | ||
$qb = $this->createDbalQueryBuilderWithCommonExpression($alias, $names); | ||
|
||
$sql = $qb | ||
->andWhere( | ||
$this->buildInExpr( | ||
$qb->expr(), | ||
'c.status', | ||
[ | ||
Status::created()->getRawValue(), | ||
Status::pending()->getRawValue() | ||
] | ||
) | ||
) | ||
->andWhere('c.next_attempt_at < :now') | ||
->orderBy('c.next_attempt_at', 'ASC') | ||
->setMaxResults(1) | ||
->getSQL() | ||
; | ||
|
||
return $this->findCommandBySQL($alias, $sql, [ | ||
'now' => (new \DateTimeImmutable())->format($this->getPlatform()->getDateTimeTzFormatString()), | ||
]); | ||
} | ||
|
||
/** | ||
* @throws \Doctrine\ORM\NonUniqueResultException | ||
* @throws \Doctrine\ORM\TransactionRequiredException | ||
* @throws \ReflectionException | ||
* @throws MappingException | ||
*/ | ||
public function getFirstToProcessAndLock(array $names): ?Command | ||
private function createDbalQueryBuilderWithCommonExpression(string $alias, array $names): QueryBuilder | ||
{ | ||
$builder = $this->createQueryBuilderWithNamesCondition('t', $names); | ||
$expr = $builder->expr(); | ||
$qb = $this->getConnection()->createQueryBuilder(); | ||
$qb | ||
->select("$alias.*") | ||
->from($this->getTableName(), $alias) | ||
->forUpdate(ConflictResolutionMode::SKIP_LOCKED) | ||
; | ||
|
||
if ([] !== $names) { | ||
$qb->andWhere($this->buildInExpr($qb->expr(), "$alias.name", $names)); | ||
} | ||
|
||
return $qb; | ||
} | ||
|
||
private function buildInExpr(ExpressionBuilder $expr, string $field, array $values): string | ||
{ | ||
if ([] === $values) { | ||
throw new \InvalidArgumentException("Parameter values must not be empty"); | ||
} | ||
|
||
return | ||
$builder | ||
->andWhere( | ||
$expr->in( | ||
"t.workflow.status", | ||
[ | ||
Status::created()->getRawValue(), | ||
Status::pending()->getRawValue() | ||
] | ||
) | ||
$expr->in( | ||
$field, | ||
array_map( | ||
fn(string $name) => $expr->literal($name, ParameterType::STRING), | ||
$values | ||
) | ||
->andWhere('t.attemptCounter.nextAttemptAt < :now') | ||
->orderBy('t.attemptCounter.nextAttemptAt', 'ASC') | ||
->setParameter('now', new \DateTimeImmutable(), Types::DATETIMETZ_IMMUTABLE) | ||
->setMaxResults(1) | ||
->getQuery() | ||
->setLockMode(LockMode::PESSIMISTIC_WRITE) | ||
->getOneOrNullResult(); | ||
); | ||
} | ||
|
||
private function createQueryBuilderWithNamesCondition(string $alias, array $names): QueryBuilder | ||
/** | ||
* @throws NonUniqueResultException | ||
*/ | ||
private function findCommandBySQL(string $tableAlias, string $sql, array $parameters): ?Command | ||
{ | ||
$builder = $this->createQueryBuilder($alias); | ||
$rsm = new ResultSetMappingBuilder($this->getEntityManager()); | ||
$rsm->addRootEntityFromClassMetadata(Command::class, $tableAlias); | ||
|
||
if ([] === $names) { | ||
return $builder; | ||
} | ||
return | ||
$this->getEntityManager() | ||
->createNativeQuery($sql, $rsm) | ||
->setParameters($parameters) | ||
->getOneOrNullResult(); | ||
} | ||
|
||
$expr = $builder->expr(); | ||
$builder->andWhere($expr->in("$alias.normalizedCommand.name", $names)); | ||
/** | ||
* @throws MappingException | ||
* @throws \ReflectionException | ||
*/ | ||
private function getTableName(): string | ||
{ | ||
return $this->getEntityManager()->getMetadataFactory()->getMetadataFor(Command::class)->getTableName(); | ||
} | ||
|
||
return $builder; | ||
/** | ||
* @throws DBALException | ||
*/ | ||
private function getPlatform(): AbstractPlatform | ||
{ | ||
return $this->getConnection()->getDatabasePlatform(); | ||
} | ||
|
||
private function getConnection(): Connection | ||
{ | ||
return $this->getEntityManager()->getConnection(); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,13 +1,24 @@ | ||
all: up install test down | ||
dc := docker compose -p cb -f ./build/docker-compose.yaml | ||
dcup := $(dc) up --build -d | ||
|
||
up: | ||
docker-compose -p cb -f ./build/docker-compose.yaml up --build -d | ||
|
||
install: | ||
docker-compose -p cb -f ./build/docker-compose.yaml exec app composer install | ||
all: | ||
up-db | ||
up-php VER=8.3 | ||
update | ||
test | ||
down | ||
|
||
up-db: | ||
$(dcup) mysql8 && \ | ||
$(dcup) pgsql12 | ||
up-php: | ||
$(dc) build --build-arg PHP_VER=$(VER) && \ | ||
$(dc) up -d app | ||
update: | ||
$(dc) exec app composer update | ||
test: | ||
docker-compose -p cb -f ./build/docker-compose.yaml exec app ./vendor/bin/phpunit ./tests/Infrastructure/Persistence | ||
|
||
$(dc) exec app ./vendor/bin/phpunit ./tests/Infrastructure/Persistence | ||
down: | ||
docker-compose -p cb -f ./build/docker-compose.yaml down --rmi all | ||
$(dc) down | ||
down-all: | ||
$(dc) down --rmi all |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.