Skip to content

Commit

Permalink
FEATURE: Overhaul ContentCacheFlusher
Browse files Browse the repository at this point in the history
  • Loading branch information
dlubitz committed Aug 27, 2024
1 parent c40edde commit c40e88b
Show file tree
Hide file tree
Showing 5 changed files with 148 additions and 88 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
<?php

namespace Neos\Neos\Fusion\Cache;

use Neos\Media\Domain\Model\AssetInterface;
use Neos\Media\Domain\Model\AssetVariantInterface;
use Neos\Neos\AssetUsage\Dto\AssetUsageFilter;
use Neos\ContentRepository\Core\SharedModel\ContentRepository\ContentRepositoryId;
use Neos\ContentRepository\Core\SharedModel\Node\NodeAggregateIds;
use Neos\Neos\AssetUsage\GlobalAssetUsageService;
use Neos\ContentRepositoryRegistry\ContentRepositoryRegistry;
use Neos\Flow\Persistence\PersistenceManagerInterface;
use Neos\ContentRepository\Core\SharedModel\Workspace\WorkspaceName;
use Neos\ContentRepository\Core\SharedModel\Node\NodeAggregateId;
use Neos\ContentRepository\Core\Projection\ContentGraph\NodeAggregate;
use Neos\ContentRepository\Core\ContentRepository;

class AssetChangeHandlerForCacheFlushing
{
public function __construct(
protected readonly GlobalAssetUsageService $globalAssetUsageService,
protected readonly ContentRepositoryRegistry $contentRepositoryRegistry,
protected readonly PersistenceManagerInterface $persistenceManager,
protected readonly ContentCacheFlusher $contentCacheFlusher,
) {
}

/**
* Fetches possible usages of the asset and registers nodes that use the asset as changed.
*/
public function registerAssetChange(AssetInterface $asset): void
{
// In Nodes only assets are referenced, never asset variants directly. When an asset
// variant is updated, it is passed as $asset, but since it is never "used" by any node
// no flushing of corresponding entries happens. Thus we instead use the original asset
// of the variant.
if ($asset instanceof AssetVariantInterface) {
$asset = $asset->getOriginalAsset();
}

$filter = AssetUsageFilter::create()
->withAsset($this->persistenceManager->getIdentifierByObject($asset))
->includeVariantsOfAsset();

$workspaceNamesByContentStreamId = [];
foreach ($this->globalAssetUsageService->findByFilter($filter) as $contentRepositoryId => $usages) {
$contentRepository = $this->contentRepositoryRegistry->get(ContentRepositoryId::fromString($contentRepositoryId));
foreach ($usages as $usage) {
// TODO: Remove when WorkspaceName is part of the AssetUsageProjection
$workspaceName = $workspaceNamesByContentStreamId[$contentRepositoryId][$usage->contentStreamId->value] ?? null;
if ($workspaceName === null) {
$workspace = $contentRepository->getWorkspaceFinder()->findOneByCurrentContentStreamId($usage->contentStreamId);
if ($workspace === null) {
continue;
}
$workspaceName = $workspace->workspaceName;
$workspaceNamesByContentStreamId[$contentRepositoryId][$usage->contentStreamId->value] = $workspaceName;
}
//

$nodeAggregate = $contentRepository->getContentGraph($workspaceName)->findNodeAggregateById($usage->nodeAggregateId);
if ($nodeAggregate === null) {
continue;
}
$flushNodeAggregateRequest = FlushNodeAggregateRequest::create(
$contentRepository->id,
$workspaceName,
$nodeAggregate->nodeAggregateId,
$nodeAggregate->nodeTypeName,
$this->determineParentNodeAggregateIds($contentRepository, $workspaceName, $nodeAggregate->nodeAggregateId),
);
$this->contentCacheFlusher->flushNodeAggregate($flushNodeAggregateRequest, CacheFlushingStrategy::ON_SHUTDOWN);
}
}
}

private function determineParentNodeAggregateIds(ContentRepository $contentRepository, WorkspaceName $workspaceName, NodeAggregateId $childNodeAggregateId): NodeAggregateIds
{
$parentNodeAggregates = $contentRepository->getContentGraph($workspaceName)->findParentNodeAggregates($childNodeAggregateId);
$parentNodeAggregateIds = NodeAggregateIds::fromArray(
array_map(static fn (NodeAggregate $parentNodeAggregate) => $parentNodeAggregate->nodeAggregateId, iterator_to_array($parentNodeAggregates))
);

foreach ($parentNodeAggregateIds as $parentNodeAggregateId) {
// Prevent infinite loops
if (!$parentNodeAggregateIds->contain($parentNodeAggregateId)) {
$parentNodeAggregateIds->merge($this->determineParentNodeAggregateIds($contentRepository, $workspaceName, $parentNodeAggregateId));
}
}

return $parentNodeAggregateIds;
}
}
11 changes: 11 additions & 0 deletions Neos.Neos/Classes/Fusion/Cache/CacheFlushingStrategy.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
<?php

declare(strict_types=1);

namespace Neos\Neos\Fusion\Cache;

enum CacheFlushingStrategy
{
case IMMEDIATELY;
case ON_SHUTDOWN;
}
117 changes: 38 additions & 79 deletions Neos.Neos/Classes/Fusion/Cache/ContentCacheFlusher.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,33 +14,28 @@

namespace Neos\Neos\Fusion\Cache;

use Neos\ContentRepository\Core\ContentRepository;
use Neos\ContentRepository\Core\NodeType\NodeType;
use Neos\ContentRepository\Core\NodeType\NodeTypeName;
use Neos\ContentRepository\Core\Projection\ContentGraph\NodeAggregate;
use Neos\ContentRepository\Core\SharedModel\ContentRepository\ContentRepositoryId;
use Neos\ContentRepository\Core\SharedModel\Exception\NodeTypeNotFound;
use Neos\ContentRepository\Core\SharedModel\Node\NodeAggregateId;
use Neos\ContentRepository\Core\SharedModel\Workspace\WorkspaceName;
use Neos\ContentRepositoryRegistry\ContentRepositoryRegistry;
use Neos\Flow\Annotations as Flow;
use Neos\Flow\Persistence\PersistenceManagerInterface;
use Neos\Fusion\Core\Cache\ContentCache;
use Neos\Media\Domain\Model\AssetInterface;
use Neos\Media\Domain\Model\AssetVariantInterface;
use Neos\Neos\AssetUsage\Dto\AssetUsageFilter;
use Neos\Neos\AssetUsage\GlobalAssetUsageService;
use Psr\Log\LoggerInterface;
use Neos\ContentRepository\Core\SharedModel\Node\NodeAggregateIds;

/**
* This service flushes Fusion content caches triggered by node changes.
*
* It is called when the projection changes: In this case, it is triggered by
* {@see GraphProjectorCatchUpHookForCacheFlushing} which calls this method..
* {@see GraphProjectorCatchUpHookForCacheFlushing} which calls this method.
* This is the relevant case if publishing a workspace
* - where we f.e. need to flush the cache for Live.
*
* The {@see AssetChangeHandlerForCacheFlushing} also calls this ContentCacheFlusher
* to flush the caches of all Nodes using a given asset that has changed.
*
*/
#[Flow\Scope('singleton')]
class ContentCacheFlusher
Expand All @@ -56,17 +51,17 @@ class ContentCacheFlusher
public function __construct(
protected readonly ContentCache $contentCache,
protected readonly LoggerInterface $systemLogger,
protected readonly GlobalAssetUsageService $globalAssetUsageService,
protected readonly ContentRepositoryRegistry $contentRepositoryRegistry,
protected readonly PersistenceManagerInterface $persistenceManager,
) {
}

/**
* Main entry point to *directly* flush the caches of a given workspaceName
* Main entry point to flush the caches of a given workspaceName with a given strategy.
*/
public function flushWorkspace(
FlushWorkspaceRequest $flushWorkspaceRequest
FlushWorkspaceRequest $flushWorkspaceRequest,
CacheFlushingStrategy $cacheFlushingStrategy
): void {
$tagsToFlush[ContentCache::TAG_EVERYTHING] = 'which were tagged with "Everything".';

Expand All @@ -76,14 +71,15 @@ public function flushWorkspace(
$nodeCacheIdentifier->value
);

$this->flushTags($tagsToFlush);
$this->flushTags($tagsToFlush, $cacheFlushingStrategy);
}

/**
* Main entry point to *directly* flush the caches of a given NodeAggregate
* Main entry point to flush the caches of a given NodeAggregate with a given strategy.
*/
public function flushNodeAggregate(
FlushNodeAggregateRequest $flushNodeAggregateRequest
FlushNodeAggregateRequest $flushNodeAggregateRequest,
CacheFlushingStrategy $cacheFlushingStrategy
): void {
$tagsToFlush[ContentCache::TAG_EVERYTHING] = 'which were tagged with "Everything".';

Expand All @@ -92,7 +88,7 @@ public function flushNodeAggregate(
$tagsToFlush
);

$this->flushTags($tagsToFlush);
$this->flushTags($tagsToFlush, $cacheFlushingStrategy);
}

/**
Expand Down Expand Up @@ -194,13 +190,26 @@ private function collectTagsForChangeOnNodeType(
return $tagsToFlush;
}

/**
* Flush caches according to the given tags and strategy.
*
* @param array<string,string> $tagsToFlush
*/
protected function flushTags(array $tagsToFlush, CacheFlushingStrategy $cacheFlushingStrategy): void
{
match ($cacheFlushingStrategy) {
CacheFlushingStrategy::IMMEDIATELY => $this->flushTagsImmediately($tagsToFlush),
CacheFlushingStrategy::ON_SHUTDOWN => $this->collectTagsForFlushOnShutdown($tagsToFlush)
};
}


/**
* Flush caches according to the given tags.
* Flush caches according to the given tags immediately.
*
* @param array<string,string> $tagsToFlush
*/
protected function flushTags(array $tagsToFlush): void
protected function flushTagsImmediately(array $tagsToFlush): void
{
if ($this->debugMode) {
foreach ($tagsToFlush as $tag => $logMessage) {
Expand All @@ -219,6 +228,16 @@ protected function flushTags(array $tagsToFlush): void
}
}

/**
* Collect tags to get flushed on shutdown.
*
* @param array<string,string> $tagsToFlush
*/
protected function collectTagsForFlushOnShutdown(array $tagsToFlush): void
{
$this->tagsToFlushAfterPersistance = array_merge($tagsToFlush, $this->tagsToFlushAfterPersistance);
}

/**
* @param NodeType $nodeType
* @return array<string>
Expand All @@ -237,72 +256,12 @@ function (array $types, NodeType $superType) use ($self) {
return array_unique($types);
}


/**
* Fetches possible usages of the asset and registers nodes that use the asset as changed.
*
* @throws NodeTypeNotFound
*/
// TODO: Move out of the ContentCacheFlusher and
public function registerAssetChange(AssetInterface $asset): void
{
// In Nodes only assets are referenced, never asset variants directly. When an asset
// variant is updated, it is passed as $asset, but since it is never "used" by any node
// no flushing of corresponding entries happens. Thus we instead use the original asset
// of the variant.
if ($asset instanceof AssetVariantInterface) {
$asset = $asset->getOriginalAsset();
}

$tagsToFlush = [];
$filter = AssetUsageFilter::create()
->withAsset($this->persistenceManager->getIdentifierByObject($asset))
->includeVariantsOfAsset();


$workspaceNamesByContentStreamId = [];
foreach ($this->globalAssetUsageService->findByFilter($filter) as $contentRepositoryId => $usages) {
foreach ($usages as $usage) {
// TODO: Remove when WorkspaceName is part of the AssetUsageProjection
$workspaceName = $workspaceNamesByContentStreamId[$contentRepositoryId][$usage->contentStreamId->value] ?? null;
if ($workspaceName === null) {
$contentRepository = $this->contentRepositoryRegistry->get(ContentRepositoryId::fromString($contentRepositoryId));
$workspace = $contentRepository->getWorkspaceFinder()->findOneByCurrentContentStreamId($usage->contentStreamId);
if ($workspace === null) {
continue;
}
$workspaceName = $workspace->workspaceName;
$workspaceNamesByContentStreamId[$contentRepositoryId][$usage->contentStreamId->value] = $workspaceName;
}
//

$flushNodeAggregateRequest = FlushNodeAggregateRequest::create(
ContentRepositoryId::fromString($contentRepositoryId),
$workspaceName,
$usage->nodeAggregateId,
NodeTypeName::fromString("Neos.Neos:Content"),
NodeAggregateIds::create(),
);

$tagsToFlush = array_merge(
$this->collectTagsForChangeOnNodeAggregate(
$flushNodeAggregateRequest,
true
),
$tagsToFlush
);
}
}

$this->tagsToFlushAfterPersistance = array_merge($tagsToFlush, $this->tagsToFlushAfterPersistance);
}

/**
* Flush caches according to the previously registered changes.
*/
public function flushCollectedTags(): void
{
$this->flushTags($this->tagsToFlushAfterPersistance);
$this->flushTagsImmediately($this->tagsToFlushAfterPersistance);
$this->tagsToFlushAfterPersistance = [];
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
use Neos\ContentRepository\Core\SharedModel\Node\NodeAggregateId;
use Neos\ContentRepository\Core\SharedModel\Workspace\WorkspaceName;
use Neos\EventStore\Model\EventEnvelope;
use Neos\ContentRepository\Core\NodeType\NodeTypeName;
use Neos\ContentRepository\Core\SharedModel\Node\NodeAggregateIds;
use Neos\ContentRepository\Core\Feature\WorkspacePublication\Event\WorkspaceWasDiscarded;
use Neos\ContentRepository\Core\Feature\WorkspacePublication\Event\WorkspaceWasPartiallyDiscarded;
Expand Down Expand Up @@ -263,13 +262,13 @@ private function determineParentNodeAggregateIds(WorkspaceName $workspaceName, N
public function onBeforeBatchCompleted(): void
{
foreach ($this->flushNodeAggregateRequestsOnBeforeBatchCompleted as $index => $request) {
$this->contentCacheFlusher->flushNodeAggregate($request);
$this->contentCacheFlusher->flushNodeAggregate($request, CacheFlushingStrategy::IMMEDIATELY);
$this->flushNodeAggregateRequestsOnAfterCatchUp[$index] = $request;
}
$this->flushNodeAggregateRequestsOnBeforeBatchCompleted = [];

foreach ($this->flushWorkspaceRequestsOnBeforeBatchCompleted as $index => $request) {
$this->contentCacheFlusher->flushWorkspace($request);
$this->contentCacheFlusher->flushWorkspace($request, CacheFlushingStrategy::IMMEDIATELY);
$this->flushWorkspaceRequestsOnAfterCatchUp[$index] = $request;
}
$this->flushWorkspaceRequestsOnBeforeBatchCompleted = [];
Expand All @@ -278,12 +277,12 @@ public function onBeforeBatchCompleted(): void
public function onAfterCatchUp(): void
{
foreach ($this->flushNodeAggregateRequestsOnAfterCatchUp as $request) {
$this->contentCacheFlusher->flushNodeAggregate($request);
$this->contentCacheFlusher->flushNodeAggregate($request, CacheFlushingStrategy::IMMEDIATELY);
}
$this->flushNodeAggregateRequestsOnAfterCatchUp = [];

foreach ($this->flushWorkspaceRequestsOnAfterCatchUp as $request) {
$this->contentCacheFlusher->flushWorkspace($request);
$this->contentCacheFlusher->flushWorkspace($request, CacheFlushingStrategy::IMMEDIATELY);
}
$this->flushWorkspaceRequestsOnAfterCatchUp = [];
}
Expand Down
6 changes: 2 additions & 4 deletions Neos.Neos/Classes/Package.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

namespace Neos\Neos;

use Neos\EventStore\Model\EventEnvelope;
use Neos\Flow\Cache\CacheManager;
use Neos\Flow\Core\Bootstrap;
use Neos\Flow\Monitor\FileMonitor;
Expand All @@ -26,14 +25,13 @@
use Neos\Neos\Controller\Backend\ContentController;
use Neos\Neos\Domain\Model\Site;
use Neos\Neos\Domain\Service\SiteService;
use Neos\Neos\FrontendRouting\Projection\DocumentUriPathProjection;
use Neos\Neos\Routing\Cache\RouteCacheFlusher;
use Neos\Neos\Fusion\Cache\ContentCacheFlusher;
use Neos\Fusion\Core\Cache\ContentCache;
use Neos\Neos\Service\EditorContentStreamZookeeper;
use Neos\Media\Domain\Model\AssetInterface;
use Neos\Neos\AssetUsage\GlobalAssetUsageService;
use Neos\Flow\Persistence\PersistenceManagerInterface;
use Neos\Neos\Fusion\Cache\AssetChangeHandlerForCacheFlushing;

/**
* The Neos Package
Expand Down Expand Up @@ -102,7 +100,7 @@ function (
$dispatcher->connect(
AssetService::class,
'assetUpdated',
ContentCacheFlusher::class,
AssetChangeHandlerForCacheFlushing::class,
'registerAssetChange',
false
);
Expand Down

0 comments on commit c40e88b

Please sign in to comment.