Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Task/replace workspace and contentstreamfinder #5097

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ public function workspace(): string
return $this->tableNamePrefix . '_workspace';
}

public function contentStream(): string
{
return $this->tableNamePrefix . '_contentstream';
}

public function checkpoint(): string
{
return $this->tableNamePrefix . '_checkpoint';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
use Doctrine\DBAL\Connection;
use Doctrine\DBAL\Exception as DbalException;
use Doctrine\DBAL\Schema\AbstractSchemaManager;
use Neos\ContentGraph\DoctrineDbalAdapter\Domain\Projection\Feature\ContentStream;
use Neos\ContentGraph\DoctrineDbalAdapter\Domain\Projection\Feature\NodeMove;
use Neos\ContentGraph\DoctrineDbalAdapter\Domain\Projection\Feature\NodeRemoval;
use Neos\ContentGraph\DoctrineDbalAdapter\Domain\Projection\Feature\NodeVariation;
Expand All @@ -23,6 +24,10 @@
use Neos\ContentRepository\Core\DimensionSpace\OriginDimensionSpacePoint;
use Neos\ContentRepository\Core\EventStore\EventInterface;
use Neos\ContentRepository\Core\Feature\Common\InterdimensionalSiblings;
use Neos\ContentRepository\Core\Feature\ContentStreamClosing\Event\ContentStreamWasClosed;
use Neos\ContentRepository\Core\Feature\ContentStreamClosing\Event\ContentStreamWasReopened;
use Neos\ContentRepository\Core\Feature\ContentStreamCreation\Event\ContentStreamWasCreated;
use Neos\ContentRepository\Core\Feature\ContentStreamEventStreamName;
use Neos\ContentRepository\Core\Feature\ContentStreamForking\Event\ContentStreamWasForked;
use Neos\ContentRepository\Core\Feature\ContentStreamRemoval\Event\ContentStreamWasRemoved;
use Neos\ContentRepository\Core\Feature\DimensionSpaceAdjustment\Event\DimensionShineThroughWasAdded;
Expand Down Expand Up @@ -67,6 +72,7 @@
use Neos\ContentRepository\Core\SharedModel\Node\NodeAggregateId;
use Neos\ContentRepository\Core\SharedModel\Node\NodeName;
use Neos\ContentRepository\Core\SharedModel\Workspace\ContentStreamId;
use Neos\ContentRepository\Core\SharedModel\Workspace\ContentStreamState;
use Neos\EventStore\Model\Event\SequenceNumber;
use Neos\EventStore\Model\EventEnvelope;

Expand All @@ -76,6 +82,7 @@
*/
final class DoctrineDbalContentGraphProjection implements ProjectionInterface, WithMarkStaleInterface
{
use ContentStream;
use NodeMove;
use NodeRemoval;
use NodeVariation;
Expand Down Expand Up @@ -105,14 +112,21 @@ public function setUp(): void
{
$statements = $this->determineRequiredSqlStatements();

// MIGRATION from 2024-05-23: copy data from "cr_<crid>_p_workspace" to "cr_<crid>_p_graph_workspace" table
// MIGRATION from 2024-05-25: copy data from "cr_<crid>_p_workspace"/"cr_<crid>_p_contentstream" to "cr_<crid>_p_graph_workspace"/"cr_<crid>_p_graph_contentstream" tables
$legacyWorkspaceTableName = str_replace('_p_graph_workspace', '_p_workspace', $this->tableNames->workspace());
if (
$this->dbal->getSchemaManager()->tablesExist([$legacyWorkspaceTableName])
&& !$this->dbal->getSchemaManager()->tablesExist([$this->tableNames->workspace()])
) {
$statements[] = 'INSERT INTO ' . $this->tableNames->workspace() . ' (workspacename, baseworkspacename, currentcontentstreamid, status) SELECT workspacename, baseworkspacename, currentcontentstreamid, status FROM ' . $legacyWorkspaceTableName;
}
$legacyContentStreamTableName = str_replace('_p_graph_contentstream', '_p_contentstream', $this->tableNames->contentStream());
if (
$this->dbal->getSchemaManager()->tablesExist([$legacyContentStreamTableName])
&& !$this->dbal->getSchemaManager()->tablesExist([$this->tableNames->contentStream()])
) {
$statements[] = 'INSERT INTO ' . $this->tableNames->contentStream() . ' (contentStreamId, version, sourceContentStreamId, state, removed) SELECT contentStreamId, version, sourceContentStreamId, state, removed FROM ' . $legacyContentStreamTableName;
}
// /MIGRATION

foreach ($statements as $statement) {
Expand Down Expand Up @@ -177,8 +191,11 @@ public function getState(): ContentGraphFinder
public function canHandle(EventInterface $event): bool
{
return in_array($event::class, [
ContentStreamWasClosed::class,
ContentStreamWasCreated::class,
ContentStreamWasForked::class,
ContentStreamWasRemoved::class,
ContentStreamWasReopened::class,
DimensionShineThroughWasAdded::class,
DimensionSpacePointWasMoved::class,
NodeAggregateNameWasChanged::class,
Expand Down Expand Up @@ -211,8 +228,11 @@ public function canHandle(EventInterface $event): bool
public function apply(EventInterface $event, EventEnvelope $eventEnvelope): void
{
match ($event::class) {
ContentStreamWasClosed::class => $this->whenContentStreamWasClosed($event),
ContentStreamWasCreated::class => $this->whenContentStreamWasCreated($event),
ContentStreamWasForked::class => $this->whenContentStreamWasForked($event),
ContentStreamWasRemoved::class => $this->whenContentStreamWasRemoved($event),
ContentStreamWasReopened::class => $this->whenContentStreamWasReopened($event),
DimensionShineThroughWasAdded::class => $this->whenDimensionShineThroughWasAdded($event),
DimensionSpacePointWasMoved::class => $this->whenDimensionSpacePointWasMoved($event),
NodeAggregateNameWasChanged::class => $this->whenNodeAggregateNameWasChanged($event, $eventEnvelope),
Expand Down Expand Up @@ -241,6 +261,19 @@ public function apply(EventInterface $event, EventEnvelope $eventEnvelope): void
WorkspaceWasRemoved::class => $this->whenWorkspaceWasRemoved($event),
default => throw new \InvalidArgumentException(sprintf('Unsupported event %s', get_debug_type($event))),
};
if (ContentStreamEventStreamName::isContentStreamStreamName($eventEnvelope->streamName)) {
$this->updateContentStreamVersion(ContentStreamEventStreamName::extractContentStreamIdFromStreamName($eventEnvelope->streamName), $eventEnvelope->version);
}
}

private function whenContentStreamWasClosed(ContentStreamWasClosed $event): void
{
$this->updateContentStreamState($event->contentStreamId, ContentStreamState::STATE_CLOSED);
}

private function whenContentStreamWasCreated(ContentStreamWasCreated $event): void
{
$this->createContentStream($event->contentStreamId, ContentStreamState::STATE_CREATED);
}

private function whenContentStreamWasForked(ContentStreamWasForked $event): void
Expand Down Expand Up @@ -278,6 +311,8 @@ private function whenContentStreamWasForked(ContentStreamWasForked $event): void

// NOTE: as reference edges are attached to Relation Anchor Points (and they are lazily copy-on-written),
// we do not need to copy reference edges here (but we need to do it during copy on write).

$this->createContentStream($event->newContentStreamId, ContentStreamState::STATE_FORKED);
}

private function whenContentStreamWasRemoved(ContentStreamWasRemoved $event): void
Expand Down Expand Up @@ -321,6 +356,13 @@ private function whenContentStreamWasRemoved(ContentStreamWasRemoved $event): vo
} catch (DbalException $e) {
throw new \RuntimeException(sprintf('Failed to delete non-referenced reference relations: %s', $e->getMessage()), 1716489328, $e);
}

$this->removeContentStream($event->contentStreamId);
}

private function whenContentStreamWasReopened(ContentStreamWasReopened $event): void
{
$this->updateContentStreamState($event->contentStreamId, $event->previousState);
}

private function whenDimensionShineThroughWasAdded(DimensionShineThroughWasAdded $event): void
Expand Down Expand Up @@ -675,6 +717,9 @@ private function whenRootNodeAggregateWithNodeWasCreated(RootNodeAggregateWithNo
private function whenRootWorkspaceWasCreated(RootWorkspaceWasCreated $event): void
{
$this->createWorkspace($event->workspaceName, null, $event->newContentStreamId);

// the content stream is in use now
$this->updateContentStreamState($event->newContentStreamId, ContentStreamState::STATE_IN_USE_BY_WORKSPACE);
}

private function whenSubtreeWasTagged(SubtreeWasTagged $event): void
Expand All @@ -695,24 +740,39 @@ private function whenWorkspaceBaseWorkspaceWasChanged(WorkspaceBaseWorkspaceWasC
private function whenWorkspaceRebaseFailed(WorkspaceRebaseFailed $event): void
{
$this->markWorkspaceAsOutdatedConflict($event->workspaceName);
$this->updateContentStreamState($event->candidateContentStreamId, ContentStreamState::STATE_REBASE_ERROR);
}

private function whenWorkspaceWasCreated(WorkspaceWasCreated $event): void
{
$this->createWorkspace($event->workspaceName, $event->baseWorkspaceName, $event->newContentStreamId);

// the content stream is in use now
$this->updateContentStreamState($event->newContentStreamId, ContentStreamState::STATE_IN_USE_BY_WORKSPACE);
}

private function whenWorkspaceWasDiscarded(WorkspaceWasDiscarded $event): void
{
$this->updateWorkspaceContentStreamId($event->workspaceName, $event->newContentStreamId);
$this->markWorkspaceAsOutdated($event->workspaceName);
$this->markDependentWorkspacesAsOutdated($event->workspaceName);

// the new content stream is in use now
$this->updateContentStreamState($event->newContentStreamId, ContentStreamState::STATE_IN_USE_BY_WORKSPACE);
// the previous content stream is no longer in use
$this->updateContentStreamState($event->previousContentStreamId, ContentStreamState::STATE_NO_LONGER_IN_USE);
}

private function whenWorkspaceWasPartiallyDiscarded(WorkspaceWasPartiallyDiscarded $event): void
{
$this->updateWorkspaceContentStreamId($event->workspaceName, $event->newContentStreamId);
$this->markDependentWorkspacesAsOutdated($event->workspaceName);

// the new content stream is in use now
$this->updateContentStreamState($event->newContentStreamId, ContentStreamState::STATE_IN_USE_BY_WORKSPACE);

// the previous content stream is no longer in use
$this->updateContentStreamState($event->previousContentStreamId, ContentStreamState::STATE_NO_LONGER_IN_USE);
}

private function whenWorkspaceWasPartiallyPublished(WorkspaceWasPartiallyPublished $event): void
Expand All @@ -725,6 +785,12 @@ private function whenWorkspaceWasPartiallyPublished(WorkspaceWasPartiallyPublish
$this->markWorkspaceAsUpToDate($event->sourceWorkspaceName);

$this->markDependentWorkspacesAsOutdated($event->sourceWorkspaceName);

// the new content stream is in use now
$this->updateContentStreamState($event->newSourceContentStreamId, ContentStreamState::STATE_IN_USE_BY_WORKSPACE);

// the previous content stream is no longer in use
$this->updateContentStreamState($event->previousSourceContentStreamId, ContentStreamState::STATE_NO_LONGER_IN_USE);
}

private function whenWorkspaceWasPublished(WorkspaceWasPublished $event): void
Expand All @@ -737,6 +803,12 @@ private function whenWorkspaceWasPublished(WorkspaceWasPublished $event): void
$this->markWorkspaceAsUpToDate($event->sourceWorkspaceName);

$this->markDependentWorkspacesAsOutdated($event->sourceWorkspaceName);

// the new content stream is in use now
$this->updateContentStreamState($event->newSourceContentStreamId, ContentStreamState::STATE_IN_USE_BY_WORKSPACE);

// the previous content stream is no longer in use
$this->updateContentStreamState($event->previousSourceContentStreamId, ContentStreamState::STATE_NO_LONGER_IN_USE);
}

private function whenWorkspaceWasRebased(WorkspaceWasRebased $event): void
Expand All @@ -746,6 +818,12 @@ private function whenWorkspaceWasRebased(WorkspaceWasRebased $event): void

// When the rebase is successful, we can set the status of the workspace back to UP_TO_DATE.
$this->markWorkspaceAsUpToDate($event->workspaceName);

// the new content stream is in use now
$this->updateContentStreamState($event->newContentStreamId, ContentStreamState::STATE_IN_USE_BY_WORKSPACE);

// the previous content stream is no longer in use
$this->updateContentStreamState($event->previousContentStreamId, ContentStreamState::STATE_NO_LONGER_IN_USE);
}

private function whenWorkspaceWasRemoved(WorkspaceWasRemoved $event): void
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public function buildSchema(AbstractSchemaManager $schemaManager): Schema
$this->createReferenceRelationTable(),
$this->createDimensionSpacePointsTable(),
$this->createWorkspaceTable(),
$this->createContentStreamTable(),
]);
}

Expand Down Expand Up @@ -112,6 +113,18 @@ private function createWorkspaceTable(): Table
return $workspaceTable->setPrimaryKey(['workspacename']);
}

private function createContentStreamTable(): Table
{
return self::createTable($this->tableNames->contentStream(), [
DbalSchemaFactory::columnForContentStreamId('contentStreamId')->setNotnull(true),
(new Column('version', Type::getType(Types::INTEGER)))->setNotnull(true),
DbalSchemaFactory::columnForContentStreamId('sourceContentStreamId')->setNotnull(false),
// Should become a DB ENUM (unclear how to configure with DBAL) or int (latter needs adaption to code)
(new Column('state', Type::getType(Types::BINARY)))->setLength(20)->setNotnull(true),
(new Column('removed', Type::getType(Types::BOOLEAN)))->setDefault(false)->setNotnull(false)
]);
}

/**
* @param array<Column> $columns
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
<?php

declare(strict_types=1);

namespace Neos\ContentGraph\DoctrineDbalAdapter\Domain\Projection\Feature;

use Neos\ContentRepository\Core\SharedModel\Workspace\ContentStreamId;
use Neos\ContentRepository\Core\SharedModel\Workspace\ContentStreamState;
use Neos\EventStore\Model\Event\Version;

/**
* The ContentStream projection feature trait
*
* @internal
*/
trait ContentStream
{
private function createContentStream(ContentStreamId $contentStreamId, ContentStreamState $state, ?ContentStreamId $sourceContentStreamId = null): void
{
$this->dbal->insert($this->tableNames->contentStream(), [
'contentStreamId' => $contentStreamId->value,
'sourceContentStreamId' => $sourceContentStreamId?->value,
'version' => 0,
'state' => $state->value,
]);
}

private function updateContentStreamState(ContentStreamId $contentStreamId, ContentStreamState $state): void
{
$this->dbal->update($this->tableNames->contentStream(), [
'state' => $state->value,
], [
'contentStreamId' => $contentStreamId->value
]);
}

private function removeContentStream(ContentStreamId $contentStreamId): void
{
$this->dbal->update($this->tableNames->contentStream(), [
'removed' => true,
], [
'contentStreamId' => $contentStreamId->value
]);
}

private function updateContentStreamVersion(ContentStreamId $contentStreamId, Version $version): void
{
$this->dbal->update($this->tableNames->contentStream(), [
'version' => $version->value,
], [
'contentStreamId' => $contentStreamId->value,
]);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@
use Neos\ContentRepository\Core\CommandHandler\CommandInterface;
use Neos\ContentRepository\Core\CommandHandler\CommandResult;
use Neos\ContentRepository\Core\Projection\ContentGraph\ContentGraphInterface;
use Neos\ContentRepository\Core\Projection\ContentStream\ContentStreamFinder;
use Neos\ContentRepository\Core\Projection\Workspace\WorkspaceFinder;
use Neos\ContentRepository\Core\Projection\Workspace\Workspace;
use Neos\ContentRepository\Core\SharedModel\Exception\WorkspaceDoesNotExist;
use Neos\ContentRepository\Core\SharedModel\Workspace\ContentStreamId;
use Neos\ContentRepository\Core\SharedModel\Workspace\ContentStreamState;
use Neos\ContentRepository\Core\SharedModel\Workspace\WorkspaceName;
use Neos\EventStore\Model\Event\Version;

/**
* An adapter to provide aceess to read projection data and delegate (sub) commands
Expand All @@ -46,14 +47,28 @@ public function handle(CommandInterface $command): CommandResult
return $this->contentRepository->handle($command);
}

public function getWorkspaceFinder(): WorkspaceFinder
public function getContentStreamVersion(ContentStreamId $contentStreamId): Version
{
return $this->contentRepository->getWorkspaceFinder();
// TODO implement
return Version::fromInteger(1);
}

public function getContentStreamFinder(): ContentStreamFinder
public function contentStreamExists(ContentStreamId $contentStreamId): bool
{
return $this->contentRepository->getContentStreamFinder();
// TODO implement
return false;
}

public function getContentStreamState(ContentStreamId $contentStreamId): ContentStreamState
{
// TODO implement
return ContentStreamState::STATE_FORKED;
}

public function findWorkspaceByName(WorkspaceName $workspaceName): ?Workspace
{
// TODO implement
return null;
}

/**
Expand Down
Loading
Loading