From 783ec49ee3d861784a798d4f840665b3aa42e3ba Mon Sep 17 00:00:00 2001 From: dantb Date: Fri, 12 Jan 2024 09:35:21 +0000 Subject: [PATCH] Replace Envelope with Elem.SuccessElem (#4629) * Replace Envelope with Elem.SuccessElem * Remove event streaming from global tables * Separate global/scoped event streaming, remove unused methods * Remove more unused streaming methods * Make successful streams of elements explicit in type * Fix coordinator tests * Renames * Remove GlobalStateValue, parse from DB directly --- .../blazegraph/BlazegraphDeletionTask.scala | 2 +- .../blazegraph/BlazegraphIndexingAction.scala | 4 +- .../plugins/blazegraph/BlazegraphViews.scala | 25 ++-- .../indexing/BlazegraphCoordinator.scala | 6 +- .../BlazegraphIndexingActionSuite.scala | 16 +-- .../indexing/BlazegraphCoordinatorSuite.scala | 55 +++------ .../compositeviews/CompositeViews.scala | 11 +- .../ElasticSearchIndexingAction.scala | 4 +- .../elasticsearch/ElasticSearchViews.scala | 23 ++-- .../EventMetricsProjection.scala | 6 +- .../deletion/ElasticSearchDeletionTask.scala | 2 +- .../indexing/ElasticSearchCoordinator.scala | 6 +- .../ElasticSearchDefaultViewsResetter.scala | 4 +- .../ElasticSearchIndexingActionSuite.scala | 16 +-- .../ElasticSearchCoordinatorSuite.scala | 54 +++----- ...asticSearchDefaultViewsResetterSuite.scala | 4 +- .../elasticsearch/metrics/MetricsStream.scala | 24 ++-- .../delta/plugins/storage/files/Files.scala | 24 ++-- .../plugins/storage/storages/Storages.scala | 4 +- .../bluebrain/nexus/delta/sdk/acls/Acls.scala | 20 +-- .../nexus/delta/sdk/acls/AclsImpl.scala | 10 +- .../delta/sdk/projects/ProjectsImpl.scala | 4 +- .../nexus/delta/sdk/sse/SseEventLog.scala | 11 +- .../nexus/delta/sdk/acls/AclsImplSpec.scala | 58 +-------- .../delta/sdk/sse/SseEventLogSuite.scala | 38 +++--- .../nexus/delta/sourcing/GlobalEventLog.scala | 40 +----- .../nexus/delta/sourcing/ScopedEventLog.scala | 38 ++---- .../delta/sourcing/event/EventStreaming.scala | 78 ++++++++++-- .../sourcing/event/GlobalEventStore.scala | 62 ++-------- .../sourcing/event/ScopedEventStore.scala | 61 ++-------- .../nexus/delta/sourcing/model/Envelope.scala | 115 ------------------ .../nexus/delta/sourcing/model/package.scala | 2 +- .../sourcing/state/GlobalStateStore.scala | 40 ++---- .../sourcing/state/ScopedStateStore.scala | 68 ++--------- .../nexus/delta/sourcing/stream/Elem.scala | 43 +++++-- .../delta/sourcing/stream/Operation.scala | 4 +- .../delta/sourcing/GlobalEventLogSuite.scala | 9 -- .../event/GlobalEventStoreSuite.scala | 24 +--- .../event/ScopedEventStoreSuite.scala | 39 +----- .../state/GlobalStateStoreSuite.scala | 22 +--- .../state/ScopedStateStoreSuite.scala | 44 ++++--- 41 files changed, 333 insertions(+), 787 deletions(-) delete mode 100644 delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/model/Envelope.scala diff --git a/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/BlazegraphDeletionTask.scala b/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/BlazegraphDeletionTask.scala index 3ee6c1483f..fdc62c25db 100644 --- a/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/BlazegraphDeletionTask.scala +++ b/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/BlazegraphDeletionTask.scala @@ -43,7 +43,7 @@ object BlazegraphDeletionTask { def apply(views: BlazegraphViews) = new BlazegraphDeletionTask( - project => views.currentIndexingViews(project).evalMapFilter(_.toIO), + project => views.currentIndexingViews(project).map(_.value), (v: ActiveViewDef, subject: Subject) => views.internalDeprecate(v.ref.viewId, v.ref.project, v.rev)(subject).handleErrorWith { r => logger.error(s"Deprecating '$v' resulted in error: '$r'.") diff --git a/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/BlazegraphIndexingAction.scala b/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/BlazegraphIndexingAction.scala index f4829fc95e..503fbeb39c 100644 --- a/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/BlazegraphIndexingAction.scala +++ b/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/BlazegraphIndexingAction.scala @@ -7,7 +7,7 @@ import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.indexing.{BlazegraphSink import ch.epfl.bluebrain.nexus.delta.sdk.IndexingAction import ch.epfl.bluebrain.nexus.delta.sdk.model.BaseUri import ch.epfl.bluebrain.nexus.delta.sourcing.config.BatchConfig -import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ElemStream, ProjectRef, Tag} +import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ElemStream, ProjectRef, SuccessElemStream, Tag} import ch.epfl.bluebrain.nexus.delta.sourcing.state.GraphResource import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Operation.Sink import ch.epfl.bluebrain.nexus.delta.sourcing.stream._ @@ -27,7 +27,7 @@ import scala.concurrent.duration.FiniteDuration * a maximum duration for the indexing */ final class BlazegraphIndexingAction( - fetchCurrentViews: ProjectRef => ElemStream[IndexingViewDef], + fetchCurrentViews: ProjectRef => SuccessElemStream[IndexingViewDef], compilePipeChain: PipeChain => Either[ProjectionErr, Operation], sink: ActiveViewDef => Sink, override val timeout: FiniteDuration diff --git a/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/BlazegraphViews.scala b/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/BlazegraphViews.scala index 5aeb0a1944..9b95aa439f 100644 --- a/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/BlazegraphViews.scala +++ b/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/BlazegraphViews.scala @@ -35,6 +35,7 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Subject import ch.epfl.bluebrain.nexus.delta.sourcing.model.Tag.UserTag import ch.epfl.bluebrain.nexus.delta.sourcing.model._ import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset +import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem import io.circe.Json import java.util.UUID @@ -318,29 +319,21 @@ final class BlazegraphViews( /** * Return the existing indexing views in a project in a finite stream */ - def currentIndexingViews(project: ProjectRef): ElemStream[IndexingViewDef] = - log.currentStates(Scope.Project(project)).evalMapFilter { envelope => - IO.pure(toIndexViewDef(envelope)) - } - - /** - * Return all existing indexing views in a finite stream - */ - def currentIndexingViews: ElemStream[IndexingViewDef] = - log.currentStates(Scope.Root).evalMapFilter { envelope => - IO.pure(toIndexViewDef(envelope)) + def currentIndexingViews(project: ProjectRef): SuccessElemStream[IndexingViewDef] = + log.currentStates(Scope.Project(project)).evalMapFilter { elem => + IO.pure(toIndexViewDef(elem)) } /** * Return the indexing views in a non-ending stream */ - def indexingViews(start: Offset): ElemStream[IndexingViewDef] = - log.states(Scope.Root, start).evalMapFilter { envelope => - IO.pure(toIndexViewDef(envelope)) + def indexingViews(start: Offset): SuccessElemStream[IndexingViewDef] = + log.states(Scope.Root, start).evalMapFilter { elem => + IO.pure(toIndexViewDef(elem)) } - private def toIndexViewDef(envelope: Envelope[BlazegraphViewState]) = - envelope.toElem { v => Some(v.project) }.traverse { v => + private def toIndexViewDef(elem: Elem.SuccessElem[BlazegraphViewState]) = + elem.withProject(elem.value.project).traverse { v => IndexingViewDef(v, prefix) } diff --git a/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/indexing/BlazegraphCoordinator.scala b/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/indexing/BlazegraphCoordinator.scala index de8b5f7b49..0581027cbc 100644 --- a/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/indexing/BlazegraphCoordinator.scala +++ b/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/indexing/BlazegraphCoordinator.scala @@ -11,7 +11,7 @@ import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.indexing.IndexingViewDef import ch.epfl.bluebrain.nexus.delta.sdk.model.BaseUri import ch.epfl.bluebrain.nexus.delta.sdk.stream.GraphResourceStream import ch.epfl.bluebrain.nexus.delta.sdk.views.ViewRef -import ch.epfl.bluebrain.nexus.delta.sourcing.model.ElemStream +import ch.epfl.bluebrain.nexus.delta.sourcing.model.SuccessElemStream import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Operation.Sink import ch.epfl.bluebrain.nexus.delta.sourcing.stream._ @@ -41,7 +41,7 @@ object BlazegraphCoordinator { * the general supervisor */ final private class Active( - fetchViews: Offset => ElemStream[IndexingViewDef], + fetchViews: Offset => SuccessElemStream[IndexingViewDef], graphStream: GraphResourceStream, compilePipeChain: PipeChain => Either[ProjectionErr, Operation], cache: LocalCache[ViewRef, ActiveViewDef], @@ -145,7 +145,7 @@ object BlazegraphCoordinator { } def apply( - fetchViews: Offset => ElemStream[IndexingViewDef], + fetchViews: Offset => SuccessElemStream[IndexingViewDef], graphStream: GraphResourceStream, compilePipeChain: PipeChain => Either[ProjectionErr, Operation], supervisor: Supervisor, diff --git a/delta/plugins/blazegraph/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/BlazegraphIndexingActionSuite.scala b/delta/plugins/blazegraph/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/BlazegraphIndexingActionSuite.scala index 20575002ff..c35298b210 100644 --- a/delta/plugins/blazegraph/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/BlazegraphIndexingActionSuite.scala +++ b/delta/plugins/blazegraph/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/BlazegraphIndexingActionSuite.scala @@ -14,7 +14,7 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.PullRequest.PullRequestState import ch.epfl.bluebrain.nexus.delta.sourcing.PullRequest.PullRequestState.PullRequestActive import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Anonymous import ch.epfl.bluebrain.nexus.delta.sourcing.model.Tag.UserTag -import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ElemStream, ProjectRef} +import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ProjectRef, SuccessElemStream} import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import ch.epfl.bluebrain.nexus.delta.sourcing.query.SelectFilter import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem.{DroppedElem, FailedElem, SuccessElem} @@ -75,9 +75,7 @@ class BlazegraphIndexingActionSuite extends NexusSuite with Fixtures { ViewRef(project, id4) ) - private val id5 = nxv + "view5" - - private def viewStream: ElemStream[IndexingViewDef] = + private def viewStream: SuccessElemStream[IndexingViewDef] = Stream( SuccessElem( tpe = BlazegraphViews.entityType, @@ -114,14 +112,6 @@ class BlazegraphIndexingActionSuite extends NexusSuite with Fixtures { offset = Offset.at(4L), value = view4, rev = 1 - ), - DroppedElem( - tpe = BlazegraphViews.entityType, - id = id5, - project = Some(project), - Instant.EPOCH, - Offset.at(5L), - rev = 1 ) ) @@ -159,7 +149,7 @@ class BlazegraphIndexingActionSuite extends NexusSuite with Fixtures { ) test("Collect only the adequate views") { - val expected = IdAcc(Set(id1), Set(id2, id4, id5), Set(id3)) + val expected = IdAcc(Set(id1), Set(id2, id4), Set(id3)) indexingAction .projections(project, elem) diff --git a/delta/plugins/blazegraph/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/indexing/BlazegraphCoordinatorSuite.scala b/delta/plugins/blazegraph/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/indexing/BlazegraphCoordinatorSuite.scala index c66ac43b78..19df5b9cf6 100644 --- a/delta/plugins/blazegraph/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/indexing/BlazegraphCoordinatorSuite.scala +++ b/delta/plugins/blazegraph/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/indexing/BlazegraphCoordinatorSuite.scala @@ -9,10 +9,10 @@ import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.ExpandedJsonLd import ch.epfl.bluebrain.nexus.delta.sdk.stream.GraphResourceStream import ch.epfl.bluebrain.nexus.delta.sdk.views.ViewRef import ch.epfl.bluebrain.nexus.delta.sourcing.PullRequest -import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ElemStream, ProjectRef} +import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ProjectRef, SuccessElemStream} import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import ch.epfl.bluebrain.nexus.delta.sourcing.query.SelectFilter -import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem.{DroppedElem, FailedElem, SuccessElem} +import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem.SuccessElem import ch.epfl.bluebrain.nexus.delta.sourcing.stream.ProjectionErr.CouldNotFindPipeErr import ch.epfl.bluebrain.nexus.delta.sourcing.stream.SupervisorSetup.unapply import ch.epfl.bluebrain.nexus.delta.sourcing.stream._ @@ -65,7 +65,7 @@ class BlazegraphCoordinatorSuite extends NexusSuite with SupervisorSetup.Fixture ViewRef(project, id3), projection = id3.toString, SelectFilter.latest, - Some(PipeChain(PipeRef.unsafe("xxx") -> ExpandedJsonLd.empty)), + Some(PipeChain(unknownPipe -> ExpandedJsonLd.empty)), namespace = "view3", indexingRev, rev @@ -86,8 +86,8 @@ class BlazegraphCoordinatorSuite extends NexusSuite with SupervisorSetup.Fixture private val resumeSignal = SignallingRef[IO, Boolean](false).unsafeRunSync() - // Streams 4 elements until signal is set to true and then a failed item, 1 updated view and 1 deprecated view - private def viewStream: ElemStream[IndexingViewDef] = + // Streams 3 elements until signal is set to true, then 1 updated view and 1 deprecated view + private def viewStream: SuccessElemStream[IndexingViewDef] = Stream( SuccessElem( tpe = BlazegraphViews.entityType, @@ -98,20 +98,12 @@ class BlazegraphCoordinatorSuite extends NexusSuite with SupervisorSetup.Fixture value = view1, rev = 1 ), - DroppedElem( - tpe = BlazegraphViews.entityType, - id = nxv + "dropped", - project = Some(project), - Instant.EPOCH, - Offset.at(2L), - rev = 1 - ), SuccessElem( tpe = BlazegraphViews.entityType, id = view2.ref.viewId, project = Some(project), instant = Instant.EPOCH, - offset = Offset.at(3L), + offset = Offset.at(2L), value = view2, rev = 1 ), @@ -120,26 +112,17 @@ class BlazegraphCoordinatorSuite extends NexusSuite with SupervisorSetup.Fixture id = view3.ref.viewId, project = Some(project), instant = Instant.EPOCH, - offset = Offset.at(4L), + offset = Offset.at(3L), value = view3, rev = 1 ) ) ++ Stream.never[IO].interruptWhen(resumeSignal) ++ Stream( - FailedElem( - tpe = BlazegraphViews.entityType, - id = nxv + "failed_coord", - project = Some(project), - Instant.EPOCH, - Offset.at(5L), - new IllegalStateException("Something got wrong :("), - rev = 1 - ), SuccessElem( tpe = BlazegraphViews.entityType, id = deprecatedView1.ref.viewId, project = Some(project), instant = Instant.EPOCH, - offset = Offset.at(6L), + offset = Offset.at(4L), value = deprecatedView1, rev = 1 ), @@ -148,17 +131,17 @@ class BlazegraphCoordinatorSuite extends NexusSuite with SupervisorSetup.Fixture id = updatedView2.ref.viewId, project = Some(project), instant = Instant.EPOCH, - offset = Offset.at(7L), + offset = Offset.at(5L), value = updatedView2, rev = 1 ), - // Elem at offset 8 represents a view update that does not require reindexing + // Elem at offset 6 represents a view update that does not require reindexing SuccessElem( tpe = BlazegraphViews.entityType, id = updatedView2.ref.viewId, project = Some(project), instant = Instant.EPOCH, - offset = Offset.at(8L), + offset = Offset.at(6L), value = updatedView2, rev = 1 ) @@ -187,7 +170,7 @@ class BlazegraphCoordinatorSuite extends NexusSuite with SupervisorSetup.Fixture ) _ <- sv.describe(BlazegraphCoordinator.metadata.name) .map(_.map(_.progress)) - .assertEquals(Some(ProjectionProgress(Offset.at(4L), Instant.EPOCH, 4, 1, 1))) + .assertEquals(Some(ProjectionProgress(Offset.at(3L), Instant.EPOCH, 3, 0, 1))) .eventually } yield () } @@ -268,7 +251,7 @@ class BlazegraphCoordinatorSuite extends NexusSuite with SupervisorSetup.Fixture _ <- resumeSignal.set(true) _ <- sv.describe(BlazegraphCoordinator.metadata.name) .map(_.map(_.progress)) - .assertEquals(Some(ProjectionProgress(Offset.at(8L), Instant.EPOCH, 8, 1, 2))) + .assertEquals(Some(ProjectionProgress(Offset.at(6L), Instant.EPOCH, 6, 0, 1))) .eventually } yield () } @@ -311,17 +294,9 @@ class BlazegraphCoordinatorSuite extends NexusSuite with SupervisorSetup.Fixture } yield () } - test("Coordinator projection should have one error after failed elem offset 3") { - for { - entries <- projectionErrors.failedElemEntries(BlazegraphCoordinator.metadata.name, Offset.At(3L)).compile.toList - r = entries.assertOneElem - _ = assertEquals(r.failedElemData.id, nxv + "failed_coord") - } yield () - } - - test("View 2_2 projection should have one error after failed elem offset 3") { + test("View 2_2 projection should have one error after failed elem offset 2") { for { - entries <- projectionErrors.failedElemEntries(updatedView2.projection, Offset.At(3L)).compile.toList + entries <- projectionErrors.failedElemEntries(updatedView2.projection, Offset.At(2L)).compile.toList r = entries.assertOneElem _ = assertEquals(r.failedElemData.id, nxv + "failed") } yield () diff --git a/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/CompositeViews.scala b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/CompositeViews.scala index 7e9546835d..e9d94c85cf 100644 --- a/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/CompositeViews.scala +++ b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/CompositeViews.scala @@ -34,6 +34,7 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Subject import ch.epfl.bluebrain.nexus.delta.sourcing.model.Tag.UserTag import ch.epfl.bluebrain.nexus.delta.sourcing.model._ import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset +import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem import io.circe.Json /** @@ -332,20 +333,14 @@ final class CompositeViews private ( def currentViews(project: ProjectRef): ElemStream[CompositeViewDef] = log.currentStates(Scope.Project(project)).map(toCompositeViewDef) - /** - * Return all existing indexing views in a finite stream - */ - def currentViews: ElemStream[CompositeViewDef] = - log.currentStates(Scope.Root).map(toCompositeViewDef) - /** * Return the indexing views in a non-ending stream */ def views(start: Offset): ElemStream[CompositeViewDef] = log.states(Scope.Root, start).map(toCompositeViewDef) - private def toCompositeViewDef(envelope: Envelope[CompositeViewState]) = - envelope.toElem { v => Some(v.project) }.map { v => + private def toCompositeViewDef(elem: Elem.SuccessElem[CompositeViewState]) = + elem.withProject(elem.value.project).mapValue { v => CompositeViewDef(v) } diff --git a/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/ElasticSearchIndexingAction.scala b/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/ElasticSearchIndexingAction.scala index 9e5b276d22..e6bd5b6007 100644 --- a/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/ElasticSearchIndexingAction.scala +++ b/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/ElasticSearchIndexingAction.scala @@ -8,7 +8,7 @@ import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.indexing.{ElasticSear import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.RemoteContextResolution import ch.epfl.bluebrain.nexus.delta.sdk.IndexingAction import ch.epfl.bluebrain.nexus.delta.sourcing.config.BatchConfig -import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ElemStream, ProjectRef, Tag} +import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ElemStream, ProjectRef, SuccessElemStream, Tag} import ch.epfl.bluebrain.nexus.delta.sourcing.state.GraphResource import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Operation.Sink import ch.epfl.bluebrain.nexus.delta.sourcing.stream._ @@ -28,7 +28,7 @@ import scala.concurrent.duration.FiniteDuration * a maximum duration for the indexing */ final class ElasticSearchIndexingAction( - fetchCurrentViews: ProjectRef => ElemStream[IndexingViewDef], + fetchCurrentViews: ProjectRef => SuccessElemStream[IndexingViewDef], compilePipeChain: PipeChain => Either[ProjectionErr, Operation], sink: ActiveViewDef => Sink, override val timeout: FiniteDuration diff --git a/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/ElasticSearchViews.scala b/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/ElasticSearchViews.scala index 75e20459d5..a8081d5778 100644 --- a/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/ElasticSearchViews.scala +++ b/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/ElasticSearchViews.scala @@ -34,6 +34,7 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Subject import ch.epfl.bluebrain.nexus.delta.sourcing.model.Tag.UserTag import ch.epfl.bluebrain.nexus.delta.sourcing.model._ import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset +import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem import io.circe.Json import java.util.UUID @@ -367,35 +368,35 @@ final class ElasticSearchViews private ( /** * Return the existing indexing views in a project in a finite stream */ - def currentIndexingViews(project: ProjectRef): ElemStream[IndexingViewDef] = + def currentIndexingViews(project: ProjectRef): SuccessElemStream[IndexingViewDef] = log .currentStates(Scope.Project(project)) - .evalMapFilter { envelope => - IO.pure(toIndexViewDef(envelope)) + .evalMapFilter { elem => + IO.pure(toIndexViewDef(elem)) } /** * Return all existing indexing views in a finite stream */ - def currentIndexingViews: ElemStream[IndexingViewDef] = + def currentIndexingViews: SuccessElemStream[IndexingViewDef] = log .currentStates(Scope.Root) - .evalMapFilter { envelope => - IO.pure(toIndexViewDef(envelope)) + .evalMapFilter { elem => + IO.pure(toIndexViewDef(elem)) } /** * Return the indexing views in a non-ending stream */ - def indexingViews(start: Offset): ElemStream[IndexingViewDef] = + def indexingViews(start: Offset): SuccessElemStream[IndexingViewDef] = log .states(Scope.Root, start) - .evalMapFilter { envelope => - IO.pure(toIndexViewDef(envelope)) + .evalMapFilter { elem => + IO.pure(toIndexViewDef(elem)) } - private def toIndexViewDef(envelope: Envelope[ElasticSearchViewState]) = - envelope.toElem { v => Some(v.project) }.traverse { v => + private def toIndexViewDef(elem: Elem.SuccessElem[ElasticSearchViewState]) = + elem.withProject(elem.value.project).traverse { v => IndexingViewDef(v, defaultElasticsearchMapping, defaultElasticsearchSettings, prefix) } diff --git a/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/EventMetricsProjection.scala b/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/EventMetricsProjection.scala index 47577ce83a..a9f76d1b03 100644 --- a/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/EventMetricsProjection.scala +++ b/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/EventMetricsProjection.scala @@ -10,7 +10,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.model.metrics.EventMetric._ import ch.epfl.bluebrain.nexus.delta.sdk.model.metrics.ScopedEventMetricEncoder import ch.epfl.bluebrain.nexus.delta.sourcing.config.{BatchConfig, QueryConfig} import ch.epfl.bluebrain.nexus.delta.sourcing.event.EventStreaming -import ch.epfl.bluebrain.nexus.delta.sourcing.model.EnvelopeStream +import ch.epfl.bluebrain.nexus.delta.sourcing.model.SuccessElemStream import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Operation.Sink import ch.epfl.bluebrain.nexus.delta.sourcing.stream._ @@ -78,12 +78,12 @@ object EventMetricsProjection { def apply( sink: Sink, supervisor: Supervisor, - metrics: Offset => EnvelopeStream[ProjectScopedMetric], + metrics: Offset => SuccessElemStream[ProjectScopedMetric], init: IO[Unit] ): IO[EventMetricsProjection] = { val source = Source { (offset: Offset) => - metrics(offset).map { e => e.toElem { m => Some(m.project) } } + metrics(offset).map(e => e.withProject(e.value.project)) } val compiledProjection = diff --git a/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/deletion/ElasticSearchDeletionTask.scala b/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/deletion/ElasticSearchDeletionTask.scala index 47b40c4214..91d4813c56 100644 --- a/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/deletion/ElasticSearchDeletionTask.scala +++ b/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/deletion/ElasticSearchDeletionTask.scala @@ -44,7 +44,7 @@ object ElasticSearchDeletionTask { def apply(views: ElasticSearchViews) = new ElasticSearchDeletionTask( - project => views.currentIndexingViews(project).evalMapFilter(_.toIO), + project => views.currentIndexingViews(project).map(_.value), (v: ActiveViewDef, subject: Subject) => views .internalDeprecate(v.ref.viewId, v.ref.project, v.rev)(subject) diff --git a/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/indexing/ElasticSearchCoordinator.scala b/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/indexing/ElasticSearchCoordinator.scala index 686fc36b02..704968af16 100644 --- a/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/indexing/ElasticSearchCoordinator.scala +++ b/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/indexing/ElasticSearchCoordinator.scala @@ -14,7 +14,7 @@ import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.RemoteContextResolution import ch.epfl.bluebrain.nexus.delta.sdk.http.HttpClientError.{HttpClientStatusError, HttpServerStatusError} import ch.epfl.bluebrain.nexus.delta.sdk.stream.GraphResourceStream import ch.epfl.bluebrain.nexus.delta.sdk.views.ViewRef -import ch.epfl.bluebrain.nexus.delta.sourcing.model.ElemStream +import ch.epfl.bluebrain.nexus.delta.sourcing.model.SuccessElemStream import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Operation.Sink import ch.epfl.bluebrain.nexus.delta.sourcing.stream._ @@ -45,7 +45,7 @@ object ElasticSearchCoordinator { * the general supervisor */ final private class Active( - fetchViews: Offset => ElemStream[IndexingViewDef], + fetchViews: Offset => SuccessElemStream[IndexingViewDef], graphStream: GraphResourceStream, compilePipeChain: PipeChain => Either[ProjectionErr, Operation], cache: LocalCache[ViewRef, ActiveViewDef], @@ -166,7 +166,7 @@ object ElasticSearchCoordinator { } def apply( - fetchViews: Offset => ElemStream[IndexingViewDef], + fetchViews: Offset => SuccessElemStream[IndexingViewDef], graphStream: GraphResourceStream, compilePipeChain: PipeChain => Either[ProjectionErr, Operation], supervisor: Supervisor, diff --git a/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/indexing/ElasticSearchDefaultViewsResetter.scala b/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/indexing/ElasticSearchDefaultViewsResetter.scala index e89aae900a..fc78676d5f 100644 --- a/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/indexing/ElasticSearchDefaultViewsResetter.scala +++ b/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/indexing/ElasticSearchDefaultViewsResetter.scala @@ -12,7 +12,7 @@ import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri import ch.epfl.bluebrain.nexus.delta.rdf.syntax.iriStringContextSyntax import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Subject -import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ElemStream, ProjectRef} +import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ProjectRef, SuccessElemStream} import doobie.implicits._ /** A way to reset default Elasticsearch views */ @@ -53,7 +53,7 @@ object ElasticSearchDefaultViewsResetter { ) def apply( - views: ElemStream[IndexingViewDef], + views: SuccessElemStream[IndexingViewDef], deleteIndex: IndexLabel => IO[Boolean], createView: (Iri, ProjectRef, ElasticSearchViewValue) => IO[Unit], newViewValue: ElasticSearchViewValue, diff --git a/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/ElasticSearchIndexingActionSuite.scala b/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/ElasticSearchIndexingActionSuite.scala index 918b157adf..19f6c7812b 100644 --- a/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/ElasticSearchIndexingActionSuite.scala +++ b/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/ElasticSearchIndexingActionSuite.scala @@ -14,7 +14,7 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.PullRequest.PullRequestState import ch.epfl.bluebrain.nexus.delta.sourcing.PullRequest.PullRequestState.PullRequestActive import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Anonymous import ch.epfl.bluebrain.nexus.delta.sourcing.model.Tag.UserTag -import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ElemStream, ProjectRef} +import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ProjectRef, SuccessElemStream} import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import ch.epfl.bluebrain.nexus.delta.sourcing.query.SelectFilter import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem.{DroppedElem, FailedElem, SuccessElem} @@ -87,9 +87,7 @@ class ElasticSearchIndexingActionSuite extends NexusSuite with CirceLiteral with ViewRef(project, id4) ) - private val id5 = nxv + "view5" - - private def viewStream: ElemStream[IndexingViewDef] = + private def viewStream: SuccessElemStream[IndexingViewDef] = Stream( SuccessElem( tpe = ElasticSearchViews.entityType, @@ -126,14 +124,6 @@ class ElasticSearchIndexingActionSuite extends NexusSuite with CirceLiteral with offset = Offset.at(4L), value = view4, rev = 1 - ), - DroppedElem( - tpe = ElasticSearchViews.entityType, - id = id5, - project = Some(project), - Instant.EPOCH, - Offset.at(5L), - rev = 1 ) ) @@ -171,7 +161,7 @@ class ElasticSearchIndexingActionSuite extends NexusSuite with CirceLiteral with ) test("Collect only the adequate views") { - val expected = IdAcc(Set(id1), Set(id2, id4, id5), Set(id3)) + val expected = IdAcc(Set(id1), Set(id2, id4), Set(id3)) indexingAction .projections(project, elem) diff --git a/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/indexing/ElasticSearchCoordinatorSuite.scala b/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/indexing/ElasticSearchCoordinatorSuite.scala index a5ca3709c0..b0abfb0359 100644 --- a/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/indexing/ElasticSearchCoordinatorSuite.scala +++ b/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/indexing/ElasticSearchCoordinatorSuite.scala @@ -9,10 +9,10 @@ import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.ExpandedJsonLd import ch.epfl.bluebrain.nexus.delta.sdk.stream.GraphResourceStream import ch.epfl.bluebrain.nexus.delta.sdk.views.{IndexingRev, ViewRef} import ch.epfl.bluebrain.nexus.delta.sourcing.PullRequest -import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ElemStream, ProjectRef} +import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ProjectRef, SuccessElemStream} import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import ch.epfl.bluebrain.nexus.delta.sourcing.query.SelectFilter -import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem.{DroppedElem, FailedElem, SuccessElem} +import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem.SuccessElem import ch.epfl.bluebrain.nexus.delta.sourcing.stream.ProjectionErr.CouldNotFindPipeErr import ch.epfl.bluebrain.nexus.delta.sourcing.stream.SupervisorSetup.unapply import ch.epfl.bluebrain.nexus.delta.sourcing.stream._ @@ -72,7 +72,7 @@ class ElasticSearchCoordinatorSuite extends NexusSuite with SupervisorSetup.Fixt private val view3 = ActiveViewDef( ViewRef(project, id3), projection = id3.toString, - Some(PipeChain(PipeRef.unsafe("xxx") -> ExpandedJsonLd.empty)), + Some(PipeChain(unknownPipe -> ExpandedJsonLd.empty)), SelectFilter.latest, index = IndexLabel.unsafe("view3"), mapping = jobj"""{"properties": { }}""", @@ -99,8 +99,8 @@ class ElasticSearchCoordinatorSuite extends NexusSuite with SupervisorSetup.Fixt ) private val resumeSignal = SignallingRef[IO, Boolean](false).unsafeRunSync() - // Streams 4 elements until signal is set to true and then a failed item, 1 updated view and 1 deprecated view - private def viewStream: ElemStream[IndexingViewDef] = + // Streams 3 elements until signal is set to true, then 1 updated view and 1 deprecated view + private def viewStream: SuccessElemStream[IndexingViewDef] = Stream( SuccessElem( tpe = ElasticSearchViews.entityType, @@ -111,20 +111,12 @@ class ElasticSearchCoordinatorSuite extends NexusSuite with SupervisorSetup.Fixt value = view1, rev = 1 ), - DroppedElem( - tpe = ElasticSearchViews.entityType, - id = nxv + "dropped", - project = Some(project), - Instant.EPOCH, - Offset.at(2L), - rev = 1 - ), SuccessElem( tpe = ElasticSearchViews.entityType, id = view2.ref.viewId, project = Some(project), instant = Instant.EPOCH, - offset = Offset.at(3L), + offset = Offset.at(2L), value = view2, rev = 1 ), @@ -133,26 +125,17 @@ class ElasticSearchCoordinatorSuite extends NexusSuite with SupervisorSetup.Fixt id = view3.ref.viewId, project = Some(project), instant = Instant.EPOCH, - offset = Offset.at(4L), + offset = Offset.at(3L), value = view3, rev = 1 ) ) ++ Stream.never[IO].interruptWhen(resumeSignal) ++ Stream( - FailedElem( - tpe = ElasticSearchViews.entityType, - id = nxv + "failed_coord", - project = Some(project), - Instant.EPOCH, - Offset.at(5L), - new IllegalStateException("Something got wrong :("), - rev = 1 - ), SuccessElem( tpe = ElasticSearchViews.entityType, id = deprecatedView1.ref.viewId, project = Some(project), instant = Instant.EPOCH, - offset = Offset.at(6L), + offset = Offset.at(4L), value = deprecatedView1, rev = 1 ), @@ -161,17 +144,17 @@ class ElasticSearchCoordinatorSuite extends NexusSuite with SupervisorSetup.Fixt id = updatedView2.ref.viewId, project = Some(project), instant = Instant.EPOCH, - offset = Offset.at(7L), + offset = Offset.at(5L), value = updatedView2, rev = 1 ), - // Elem at offset 8 represents a view update that does not require reindexing + // Elem at offset 6 represents a view update that does not require reindexing SuccessElem( tpe = ElasticSearchViews.entityType, id = updatedView2.ref.viewId, project = Some(project), instant = Instant.EPOCH, - offset = Offset.at(8L), + offset = Offset.at(6L), value = updatedView2, rev = 1 ) @@ -206,7 +189,7 @@ class ElasticSearchCoordinatorSuite extends NexusSuite with SupervisorSetup.Fixt ) _ <- sv.describe(ElasticSearchCoordinator.metadata.name) .map(_.map(_.progress)) - .assertEquals(Some(ProjectionProgress(Offset.at(4L), Instant.EPOCH, 4, 1, 1))) + .assertEquals(Some(ProjectionProgress(Offset.at(3L), Instant.EPOCH, 3, 0, 1))) .eventually } yield () } @@ -281,7 +264,7 @@ class ElasticSearchCoordinatorSuite extends NexusSuite with SupervisorSetup.Fixt _ <- resumeSignal.set(true) _ <- sv.describe(ElasticSearchCoordinator.metadata.name) .map(_.map(_.progress)) - .assertEquals(Some(ProjectionProgress(Offset.at(8L), Instant.EPOCH, 8, 1, 2))) + .assertEquals(Some(ProjectionProgress(Offset.at(6L), Instant.EPOCH, 6, 0, 1))) .eventually } yield () } @@ -318,18 +301,9 @@ class ElasticSearchCoordinatorSuite extends NexusSuite with SupervisorSetup.Fixt } yield () } - test("Coordinator projection should have one error after failed elem offset 4") { - for { - entries <- - projectionErrors.failedElemEntries(ElasticSearchCoordinator.metadata.name, Offset.At(3L)).compile.toList - r = entries.assertOneElem - _ = assertEquals(r.failedElemData.id, nxv + "failed_coord") - } yield () - } - test("View 2_2 projection should have one error after failed elem offset 4") { for { - entries <- projectionErrors.failedElemEntries(updatedView2.projection, Offset.At(4L)).compile.toList + entries <- projectionErrors.failedElemEntries(updatedView2.projection, Offset.At(3L)).compile.toList r = entries.assertOneElem _ = assertEquals(r.failedElemData.id, nxv + "failed") } yield () diff --git a/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/indexing/ElasticSearchDefaultViewsResetterSuite.scala b/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/indexing/ElasticSearchDefaultViewsResetterSuite.scala index c6f02c5481..e2ee4624ae 100644 --- a/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/indexing/ElasticSearchDefaultViewsResetterSuite.scala +++ b/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/indexing/ElasticSearchDefaultViewsResetterSuite.scala @@ -8,7 +8,7 @@ import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.model.ElasticSearchVi import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.model.{defaultViewId, permissions} import ch.epfl.bluebrain.nexus.delta.rdf.syntax.iriStringContextSyntax import ch.epfl.bluebrain.nexus.delta.sdk.views.{IndexingRev, ViewRef} -import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ElemStream, EntityType, ProjectRef} +import ch.epfl.bluebrain.nexus.delta.sourcing.model.{EntityType, ProjectRef, SuccessElemStream} import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import ch.epfl.bluebrain.nexus.delta.sourcing.postgres.Doobie import ch.epfl.bluebrain.nexus.delta.sourcing.query.SelectFilter @@ -144,7 +144,7 @@ class ElasticSearchDefaultViewsResetterSuite private val viewElem2 = viewElem1.copy(project = Some(project2), value = defaultView.copy(ref = ViewRef(project2, iri"$defaultEsViewId"))) - val viewStream: ElemStream[IndexingViewDef] = Stream(viewElem1, viewElem2) + val viewStream: SuccessElemStream[IndexingViewDef] = Stream(viewElem1, viewElem2) private lazy val resetWithNoViewCreation = ElasticSearchDefaultViewsResetter( viewStream, diff --git a/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/metrics/MetricsStream.scala b/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/metrics/MetricsStream.scala index 06c20e6903..65a042965f 100644 --- a/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/metrics/MetricsStream.scala +++ b/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/metrics/MetricsStream.scala @@ -5,8 +5,9 @@ import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.nxv import ch.epfl.bluebrain.nexus.delta.rdf.syntax.iriStringContextSyntax import ch.epfl.bluebrain.nexus.delta.sdk.model.metrics.EventMetric.{ProjectScopedMetric, _} import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Anonymous -import ch.epfl.bluebrain.nexus.delta.sourcing.model.{EntityType, Envelope, Label, ProjectRef} +import ch.epfl.bluebrain.nexus.delta.sourcing.model.{EntityType, Label, ProjectRef} import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset +import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem import fs2.Stream import io.circe.{Json, JsonObject} @@ -105,22 +106,25 @@ object MetricsStream { ) ) - private val envelopes = List( + private val elems = List( // Create file in proj1 - Envelope(EntityType("entity"), nxv + "1", 1, metric1, Instant.EPOCH, Offset.At(1L)), + elem("1", 1L, metric1), // Update file in proj1 - Envelope(EntityType("entity"), nxv + "2", 1, metric2, Instant.EPOCH, Offset.At(2L)), + elem("2", 2L, metric2), // Tag file in proj1 - Envelope(EntityType("entity"), nxv + "3", 1, metric3, Instant.EPOCH, Offset.At(3L)), + elem("3", 3L, metric3), // Delete file tag in proj 1 - Envelope(EntityType("entity"), nxv + "4", 1, metric4, Instant.EPOCH, Offset.At(4L)), + elem("4", 4L, metric4), // Create file in proj 2 - Envelope(EntityType("entity"), nxv + "5", 1, metric5, Instant.EPOCH, Offset.At(5L)), + elem("5", 5L, metric5), // Deprecate file in proj 2 - Envelope(EntityType("entity"), nxv + "6", 1, metric6, Instant.EPOCH, Offset.At(6L)) + elem("6", 6L, metric6) ) - val metricsStream: Stream[IO, Envelope[ProjectScopedMetric]] = - Stream.emits(envelopes) + def elem(idSuffix: String, offset: Long, metric: ProjectScopedMetric) = + Elem.SuccessElem(EntityType("entity"), nxv + idSuffix, None, Instant.EPOCH, Offset.At(offset), metric, 1) + + val metricsStream: Stream[IO, Elem.SuccessElem[ProjectScopedMetric]] = + Stream.emits(elems) } diff --git a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/Files.scala b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/Files.scala index e03e7d7b7e..bda65e8600 100644 --- a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/Files.scala +++ b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/Files.scala @@ -505,27 +505,27 @@ final class Files( ) stream <- log .states(Scope.root, offset) - .map { envelope => - envelope.value match { + .map { elem => + elem.value match { case f if f.storageType == StorageType.RemoteDiskStorage && !f.attributes.digest.computed && !f.deprecated => SuccessElem( entityType, - envelope.id, - Some(envelope.value.project), - envelope.instant, - envelope.offset, + elem.id, + Some(elem.value.project), + elem.instant, + elem.offset, f, - envelope.rev + elem.rev ) case _ => DroppedElem( entityType, - envelope.id, - Some(envelope.value.project), - envelope.instant, - envelope.offset, - envelope.rev + elem.id, + Some(elem.value.project), + elem.instant, + elem.offset, + elem.rev ) } } diff --git a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/Storages.scala b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/Storages.scala index b1ecf85514..f01b946fc6 100644 --- a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/Storages.scala +++ b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/Storages.scala @@ -286,8 +286,8 @@ final class Storages private ( * Return the existing storages in a project in a finite stream */ def currentStorages(project: ProjectRef): Stream[IO, Elem[StorageState]] = - log.currentStates(Scope.Project(project)).map { - _.toElem { s => Some(s.project) } + log.currentStates(Scope.Project(project)).map { e => + e.withProject(e.value.project) } private def unsetPreviousDefaultIfRequired( diff --git a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/acls/Acls.scala b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/acls/Acls.scala index a7d32942cd..02ae0165c7 100644 --- a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/acls/Acls.scala +++ b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/acls/Acls.scala @@ -15,8 +15,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.model.ResourceUris import ch.epfl.bluebrain.nexus.delta.sdk.permissions.model.Permission import ch.epfl.bluebrain.nexus.delta.sdk.syntax._ import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.{IdentityRealm, Subject} -import ch.epfl.bluebrain.nexus.delta.sourcing.model.{EntityType, EnvelopeStream, Label, ProjectRef} -import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset +import ch.epfl.bluebrain.nexus.delta.sourcing.model.{EntityType, Label, ProjectRef} import ch.epfl.bluebrain.nexus.delta.sourcing.{GlobalEntityDefinition, StateMachine} import java.time.Instant @@ -142,23 +141,6 @@ trait Acls { */ def listSelf(filter: AclAddressFilter)(implicit caller: Caller): IO[AclCollection] - /** - * A non terminating stream of events for ACLs. After emitting all known events it sleeps until new events are - * recorded. - * - * @param offset - * the last seen event offset; it will not be emitted by the stream - */ - def events(offset: Offset = Offset.Start): EnvelopeStream[AclEvent] - - /** - * The current ACLs events. The stream stops after emitting all known events. - * - * @param offset - * the last seen event offset; it will not be emitted by the stream - */ - def currentEvents(offset: Offset = Offset.Start): EnvelopeStream[AclEvent] - /** * Overrides ''acl''. * diff --git a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/acls/AclsImpl.scala b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/acls/AclsImpl.scala index 484f531aa0..341279f4b0 100644 --- a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/acls/AclsImpl.scala +++ b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/acls/AclsImpl.scala @@ -14,9 +14,9 @@ import ch.epfl.bluebrain.nexus.delta.sdk.realms.Realms import ch.epfl.bluebrain.nexus.delta.sdk.syntax._ import ch.epfl.bluebrain.nexus.delta.sourcing._ import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Subject -import ch.epfl.bluebrain.nexus.delta.sourcing.model.{EnvelopeStream, Label} -import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset +import ch.epfl.bluebrain.nexus.delta.sourcing.model.Label import ch.epfl.bluebrain.nexus.delta.sourcing.state.GlobalStateStore + final class AclsImpl private ( log: AclsLog, minimum: Set[Permission] @@ -68,11 +68,7 @@ final class AclsImpl private ( .map(_.filter(caller.identities)) .span("listSelfAcls", Map("withAncestors" -> filter.withAncestors)) - override def events(offset: Offset): EnvelopeStream[AclEvent] = log.events(offset) - - override def currentEvents(offset: Offset): EnvelopeStream[AclEvent] = log.currentEvents(offset) - - override def replace(acl: Acl, rev: Int)(implicit caller: Subject): IO[AclResource] = + override def replace(acl: Acl, rev: Int)(implicit caller: Subject): IO[AclResource] = eval(ReplaceAcl(acl, rev, caller)).span("replaceAcls") override def append(acl: Acl, rev: Int)(implicit caller: Subject): IO[AclResource] = diff --git a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/projects/ProjectsImpl.scala b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/projects/ProjectsImpl.scala index 646c410ac0..8902468487 100644 --- a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/projects/ProjectsImpl.scala +++ b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/projects/ProjectsImpl.scala @@ -88,8 +88,8 @@ final class ProjectsImpl private ( log.currentStates(Scope.root).map(_.value.project) override def states(offset: Offset): ElemStream[ProjectState] = - log.states(Scope.root, offset).map { - _.toElem { p => Some(p.project) } + log.states(Scope.root, offset).map { e => + e.withProject { e.value.project } } private def eval(cmd: ProjectCommand): IO[ProjectResource] = diff --git a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/sse/SseEventLog.scala b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/sse/SseEventLog.scala index 5f7ab99094..aa05f6723e 100644 --- a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/sse/SseEventLog.scala +++ b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/sse/SseEventLog.scala @@ -12,6 +12,7 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.event.EventStreaming import ch.epfl.bluebrain.nexus.delta.sourcing.model._ import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset.{At, Start} +import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem import ch.epfl.bluebrain.nexus.delta.sourcing.{MultiDecoder, Scope, Transactors} import fs2.Stream import io.circe.syntax.EncoderOps @@ -95,12 +96,12 @@ object SseEventLog { private val logger = Logger[SseEventLog] private[sse] def toServerSentEvent( - envelope: Envelope[SseData] + elem: Elem.SuccessElem[SseData] )(implicit jo: JsonKeyOrdering): ServerSentEvent = { - val data = envelope.value.data - envelope.offset match { - case Start => ServerSentEvent(defaultPrinter.print(data.asJson.sort), envelope.value.tpe) - case At(value) => ServerSentEvent(defaultPrinter.print(data.asJson.sort), envelope.value.tpe, value.toString) + val data = elem.value.data + elem.offset match { + case Start => ServerSentEvent(defaultPrinter.print(data.asJson.sort), elem.value.tpe) + case At(value) => ServerSentEvent(defaultPrinter.print(data.asJson.sort), elem.value.tpe, value.toString) } } diff --git a/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/acls/AclsImplSpec.scala b/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/acls/AclsImplSpec.scala index 554087345a..928756cce6 100644 --- a/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/acls/AclsImplSpec.scala +++ b/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/acls/AclsImplSpec.scala @@ -1,9 +1,9 @@ package ch.epfl.bluebrain.nexus.delta.sdk.acls import cats.effect.IO +import ch.epfl.bluebrain.nexus.delta.sdk.ConfigFixtures import ch.epfl.bluebrain.nexus.delta.sdk.acls.model.AclAddress.Organization import ch.epfl.bluebrain.nexus.delta.sdk.acls.model.AclAddressFilter.{AnyOrganization, AnyOrganizationAnyProject, AnyProject} -import ch.epfl.bluebrain.nexus.delta.sdk.acls.model.AclEvent.{AclAppended, AclDeleted, AclReplaced, AclSubtracted} import ch.epfl.bluebrain.nexus.delta.sdk.acls.model.AclRejection.{AclCannotContainEmptyPermissionCollection, AclIsEmpty, AclNotFound, NothingToBeUpdated, RevisionNotFound, UnknownPermissions} import ch.epfl.bluebrain.nexus.delta.sdk.acls.model.{Acl, AclAddress, AclCollection, AclState} import ch.epfl.bluebrain.nexus.delta.sdk.generators.AclGen.resourceFor @@ -11,10 +11,8 @@ import ch.epfl.bluebrain.nexus.delta.sdk.generators.PermissionsGen import ch.epfl.bluebrain.nexus.delta.sdk.identities.model.Caller import ch.epfl.bluebrain.nexus.delta.sdk.model.BaseUri import ch.epfl.bluebrain.nexus.delta.sdk.permissions.model.Permission -import ch.epfl.bluebrain.nexus.delta.sdk.{ConfigFixtures, SSEUtils} import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.{Anonymous, Group, Subject} import ch.epfl.bluebrain.nexus.delta.sourcing.model.{Identity, Label, ProjectRef} -import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import ch.epfl.bluebrain.nexus.delta.sourcing.postgres.DoobieScalaTestFixture import ch.epfl.bluebrain.nexus.testkit.scalatest.ce.CatsEffectSpec import org.scalatest.CancelAfterFailure @@ -197,60 +195,6 @@ class AclsImplSpec extends CatsEffectSpec with DoobieScalaTestFixture with Cance ) } - val allEvents = SSEUtils.extract( - (AclAddress.Root, AclAppended, 1L), - (AclAddress.Root, AclReplaced, 2L), - (AclAddress.Root, AclSubtracted, 3L), - (AclAddress.Root, AclDeleted, 4L), - (orgTarget, AclReplaced, 5L), - (orgTarget, AclAppended, 6L), - (AclAddress.Root, AclAppended, 7L), - (projectTarget, AclAppended, 8L), - (org2Target, AclAppended, 9L) - ) - - "get the different events from start" in { - val events = acls - .events() - .map { e => (e.value.address, e.valueClass, e.offset) } - .take(9L) - .compile - .toList - - events.accepted shouldEqual allEvents - } - - "get the different current events from start" in { - val events = acls - .currentEvents() - .map { e => (e.value.address, e.valueClass, e.offset) } - .compile - .toList - - events.accepted shouldEqual allEvents - } - - "get the different events from offset 2" in { - val events = acls - .events(Offset.at(2L)) - .map { e => (e.value.address, e.valueClass, e.offset) } - .take(9L - 2L) - .compile - .toList - - events.accepted shouldEqual allEvents.drop(2) - } - - "get the different current events from offset 2" in { - val events = acls - .currentEvents(Offset.at(2L)) - .map { e => (e.value.address, e.valueClass, e.offset) } - .compile - .toList - - events.accepted shouldEqual allEvents.drop(2) - } - "fail to fetch an ACL on nonexistent revision" in { acls.fetchAt(orgTarget, 10).rejected shouldEqual RevisionNotFound(10, 2) } diff --git a/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/sse/SseEventLogSuite.scala b/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/sse/SseEventLogSuite.scala index 9dc8f1976b..423711cd33 100644 --- a/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/sse/SseEventLogSuite.scala +++ b/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/sse/SseEventLogSuite.scala @@ -5,8 +5,9 @@ import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.nxv import ch.epfl.bluebrain.nexus.delta.rdf.utils.JsonKeyOrdering import ch.epfl.bluebrain.nexus.delta.sdk.ConfigFixtures import ch.epfl.bluebrain.nexus.delta.sdk.sse.SseEncoder.SseData -import ch.epfl.bluebrain.nexus.delta.sourcing.model.{EntityType, Envelope, ProjectRef} +import ch.epfl.bluebrain.nexus.delta.sourcing.model.{EntityType, ProjectRef} import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset +import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem import ch.epfl.bluebrain.nexus.testkit.mu.NexusSuite import io.circe.JsonObject import io.circe.syntax.EncoderOps @@ -19,51 +20,54 @@ class SseEventLogSuite extends NexusSuite with ConfigFixtures { private val ref = ProjectRef.unsafe("org", "proj") - private def makeEnvelope(sseData: SseData) = Envelope( + private def makeSuccessElem(sseData: SseData) = Elem.SuccessElem( EntityType("Person"), nxv + "1", - 4, - sseData, + None, Instant.now(), - Offset.at(5L) + Offset.at(5L), + sseData, + 4 ) test("Should not inject project uuids") { - val envelope = makeEnvelope( + val elem = makeSuccessElem( SseData("Person", None, JsonObject("name" -> "John Doe".asJson)) ) assertEquals( - SseEventLog.toServerSentEvent(envelope), + SseEventLog.toServerSentEvent(elem), ServerSentEvent("""{"name":"John Doe"}""", "Person", "5") ) } test("Should not inject project uuids when the ref is unknown") { - val envelope = Envelope( + val elem = Elem.SuccessElem( EntityType("Person"), nxv + "1", - 4, - SseData("Person", Some(ProjectRef.unsafe("xxx", "xxx")), JsonObject("name" -> "John Doe".asJson)), + None, Instant.now(), - Offset.at(5L) + Offset.at(5L), + SseData("Person", Some(ProjectRef.unsafe("xxx", "xxx")), JsonObject("name" -> "John Doe".asJson)), + 4 ) assertEquals( - SseEventLog.toServerSentEvent(envelope), + SseEventLog.toServerSentEvent(elem), ServerSentEvent("""{"name":"John Doe"}""", "Person", "5") ) } test("Should inject project uuids when the ref is unknown") { - val envelope = Envelope( + val elem = Elem.SuccessElem( EntityType("Person"), nxv + "1", - 4, - SseData("Person", Some(ref), JsonObject("name" -> "John Doe".asJson)), + None, Instant.now(), - Offset.at(5L) + Offset.at(5L), + SseData("Person", Some(ref), JsonObject("name" -> "John Doe".asJson)), + 4 ) assertEquals( - SseEventLog.toServerSentEvent(envelope), + SseEventLog.toServerSentEvent(elem), ServerSentEvent( s"""{"name":"John Doe"}""", "Person", diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/GlobalEventLog.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/GlobalEventLog.scala index 0e42b5f66a..976c3367b4 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/GlobalEventLog.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/GlobalEventLog.scala @@ -6,7 +6,6 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.EvaluationError.{EvaluationFailure import ch.epfl.bluebrain.nexus.delta.sourcing.config.EventLogConfig import ch.epfl.bluebrain.nexus.delta.sourcing.event.Event.GlobalEvent import ch.epfl.bluebrain.nexus.delta.sourcing.event.GlobalEventStore -import ch.epfl.bluebrain.nexus.delta.sourcing.model.EnvelopeStream import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import ch.epfl.bluebrain.nexus.delta.sourcing.state.GlobalStateStore import ch.epfl.bluebrain.nexus.delta.sourcing.state.State.GlobalState @@ -82,32 +81,6 @@ trait GlobalEventLog[Id, S <: GlobalState, Command, E <: GlobalEvent, Rejection */ def delete(id: Id): IO[Unit] - /** - * Allow to stream all current events within [[Envelope]] s - * @param offset - * offset to start from - */ - def currentEvents(offset: Offset): EnvelopeStream[E] - - /** - * Allow to stream all current events within [[Envelope]] s - * @param offset - * offset to start from - */ - def events(offset: Offset): EnvelopeStream[E] - - /** - * Allow to stream all latest states within [[Envelope]] s without applying transformation - * @param offset - * offset to start from - */ - def currentStates(offset: Offset): EnvelopeStream[S] - - /** - * Allow to stream all latest states from the beginning within [[Envelope]] s without applying transformation - */ - def currentStates: EnvelopeStream[S] = currentStates(Offset.Start) - /** * Allow to stream all latest states from the provided offset * @param offset @@ -122,7 +95,7 @@ trait GlobalEventLog[Id, S <: GlobalState, Command, E <: GlobalEvent, Rejection * @param f * the function to apply on each state */ - def currentStates[T](f: S => T): Stream[IO, T] = currentStates(Offset.Start, f) + final def currentStates[T](f: S => T): Stream[IO, T] = currentStates(Offset.Start, f) } object GlobalEventLog { @@ -189,17 +162,8 @@ object GlobalEventLog { override def delete(id: Id): IO[Unit] = (stateStore.delete(id) >> eventStore.delete(id)).transact(xas.write) - override def currentEvents(offset: Offset): EnvelopeStream[E] = eventStore.currentEvents(offset) - - override def events(offset: Offset): EnvelopeStream[E] = eventStore.events(offset) - override def currentStates[T](offset: Offset, f: S => T): Stream[IO, T] = - currentStates(offset).map { e => - f(e.value) - } - - override def currentStates(offset: Offset): EnvelopeStream[S] = stateStore.currentStates(offset) - + stateStore.currentStates(offset).map(f) } } diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/ScopedEventLog.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/ScopedEventLog.scala index 23ba5d8d89..d4c62538ce 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/ScopedEventLog.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/ScopedEventLog.scala @@ -113,38 +113,20 @@ trait ScopedEventLog[Id, S <: ScopedState, Command, E <: ScopedEvent, Rejection def dryRun(ref: ProjectRef, id: Id, command: Command): IO[(E, S)] /** - * Allow to stream all current events within [[Envelope]] s - * @param scope - * to filter returned events - * @param offset - * offset to start from - */ - def currentEvents(scope: Scope, offset: Offset): EnvelopeStream[E] - - /** - * Allow to stream all current events within [[Envelope]] s - * @param scope - * to filter returned events - * @param offset - * offset to start from - */ - def events(scope: Scope, offset: Offset): EnvelopeStream[E] - - /** - * Allow to stream all latest states within [[Envelope]] s without applying transformation + * Allow to stream all latest states within [[Elem.SuccessElem]] s without applying transformation * @param scope * to filter returned states * @param offset * offset to start from */ - def currentStates(scope: Scope, offset: Offset): EnvelopeStream[S] + def currentStates(scope: Scope, offset: Offset): SuccessElemStream[S] /** - * Allow to stream all latest states from the beginning within [[Envelope]] s without applying transformation + * Allow to stream all latest states from the beginning within [[Elem.SuccessElem]] s without applying transformation * @param scope * to filter returned states */ - def currentStates(scope: Scope): EnvelopeStream[S] = currentStates(scope, Offset.Start) + def currentStates(scope: Scope): SuccessElemStream[S] = currentStates(scope, Offset.Start) /** * Allow to stream all current states from the provided offset @@ -173,7 +155,7 @@ trait ScopedEventLog[Id, S <: ScopedState, Command, E <: ScopedEvent, Rejection * @param offset * the start offset */ - def states(scope: Scope, offset: Offset): EnvelopeStream[S] + def states(scope: Scope, offset: Offset): SuccessElemStream[S] } object ScopedEventLog { @@ -310,13 +292,7 @@ object ScopedEventLog { stateMachine.evaluate(state, command, maxDuration) } - override def currentEvents(scope: Scope, offset: Offset): EnvelopeStream[E] = - eventStore.currentEvents(scope, offset) - - override def events(scope: Scope, offset: Offset): EnvelopeStream[E] = - eventStore.events(scope, offset) - - override def currentStates(scope: Scope, offset: Offset): EnvelopeStream[S] = + override def currentStates(scope: Scope, offset: Offset): SuccessElemStream[S] = stateStore.currentStates(scope, offset) override def currentStates[T](scope: Scope, offset: Offset, f: S => T): Stream[IO, T] = @@ -324,7 +300,7 @@ object ScopedEventLog { f(s.value) } - override def states(scope: Scope, offset: Offset): EnvelopeStream[S] = + override def states(scope: Scope, offset: Offset): SuccessElemStream[S] = stateStore.states(scope, offset) } diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/event/EventStreaming.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/event/EventStreaming.scala index 5e54e29eb5..dc05dac706 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/event/EventStreaming.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/event/EventStreaming.scala @@ -1,12 +1,16 @@ package ch.epfl.bluebrain.nexus.delta.sourcing.event import cats.data.NonEmptyList +import cats.effect.IO import ch.epfl.bluebrain.nexus.delta.sourcing.config.QueryConfig import ch.epfl.bluebrain.nexus.delta.sourcing.implicits._ -import ch.epfl.bluebrain.nexus.delta.sourcing.model.{EntityType, Envelope, EnvelopeStream} +import ch.epfl.bluebrain.nexus.delta.sourcing.model.{EntityType, SuccessElemStream} import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset +import ch.epfl.bluebrain.nexus.delta.sourcing.query.StreamingQuery +import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem import ch.epfl.bluebrain.nexus.delta.sourcing.{MultiDecoder, Scope, Transactors} import doobie.implicits._ +import doobie.util.query.Query0 import doobie.{Fragment, Fragments} import io.circe.Json @@ -18,21 +22,81 @@ object EventStreaming { offset: Offset, config: QueryConfig, xas: Transactors - )(implicit md: MultiDecoder[A]): EnvelopeStream[A] = { + )(implicit md: MultiDecoder[A]): SuccessElemStream[A] = { val typeIn = NonEmptyList.fromList(types).map { types => Fragments.in(fr"type", types) } - Envelope.streamA( + streamA( offset, - offset => scopedEvents(typeIn, scope, offset, config).query[Envelope[Json]], + offset => scopedEvents(typeIn, scope, offset, config), xas, config ) } - private def scopedEvents(typeIn: Option[Fragment], scope: Scope, o: Offset, cfg: QueryConfig) = - fr"""SELECT type, id, value, rev, instant, ordering FROM public.scoped_events + /** + * Stream results for the provided query from the start offset. The refresh strategy in the query configuration + * defines if the stream will re-execute the query with a delay after all the results have been consumed. Failure to + * decode a stream element (from json to A) will drop the element silently. + * + * @param start + * the start offset + * @param query + * the query function for an offset + * @param xas + * the transactor instances + * @param cfg + * the query configuration + * @param md + * a decoder collection indexed on the entity type for values of type A. + * @tparam A + * the underlying value type + */ + private def streamA[A]( + start: Offset, + query: Offset => Query0[Elem.SuccessElem[Json]], + xas: Transactors, + cfg: QueryConfig + )(implicit md: MultiDecoder[A]): SuccessElemStream[A] = + streamFA(start, query, xas, cfg, (tpe, json) => IO.pure(md.decodeJson(tpe, json).toOption)) + + /** + * Stream results for the provided query from the start offset. The refresh strategy in the query configuration + * defines if the stream will re-execute the query with a delay after all the results have been consumed. + * + * @param start + * the start offset + * @param query + * the query function for an offset + * @param xas + * the transactor instances + * @param cfg + * the query configuration + * @param decode + * a decode function + * @tparam A + * the underlying value type + */ + private def streamFA[A]( + start: Offset, + query: Offset => Query0[Elem.SuccessElem[Json]], + xas: Transactors, + cfg: QueryConfig, + decode: (EntityType, Json) => IO[Option[A]] + ): SuccessElemStream[A] = + StreamingQuery[Elem.SuccessElem[Json]](start, query, _.offset, cfg, xas) + // evalMapFilter re-chunks to 1, the following 2 statements do the same but preserve the chunks + .evalMapChunk(e => decode(e.tpe, e.value).map(_.map(a => e.copy(value = a)))) + .collect { case Some(e) => e } + + private def scopedEvents( + typeIn: Option[Fragment], + scope: Scope, + o: Offset, + cfg: QueryConfig + ): Query0[Elem.SuccessElem[Json]] = + fr"""SELECT type, id, value, rev, instant, ordering, org, project FROM public.scoped_events |${Fragments.whereAndOpt(typeIn, scope.asFragment, o.asFragment)} |ORDER BY ordering - |LIMIT ${cfg.batchSize}""".stripMargin + |LIMIT ${cfg.batchSize}""".stripMargin.query[Elem.SuccessElem[Json]] } diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/event/GlobalEventStore.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/event/GlobalEventStore.scala index ee24a3686d..84fda20b7b 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/event/GlobalEventStore.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/event/GlobalEventStore.scala @@ -2,18 +2,17 @@ package ch.epfl.bluebrain.nexus.delta.sourcing.event import cats.effect.IO import cats.syntax.all._ -import ch.epfl.bluebrain.nexus.delta.sourcing.{Serializer, Transactors} import ch.epfl.bluebrain.nexus.delta.sourcing.config.QueryConfig import ch.epfl.bluebrain.nexus.delta.sourcing.event.Event.GlobalEvent +import ch.epfl.bluebrain.nexus.delta.sourcing.model.EntityType +import ch.epfl.bluebrain.nexus.delta.sourcing.{Serializer, Transactors} import ch.epfl.bluebrain.nexus.delta.sourcing.implicits._ -import ch.epfl.bluebrain.nexus.delta.sourcing.model.{EntityType, Envelope, EnvelopeStream} -import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset -import ch.epfl.bluebrain.nexus.delta.sourcing.query.{RefreshStrategy, StreamingQuery} + import doobie._ -import doobie.implicits._ import doobie.postgres.implicits._ + +import doobie.implicits._ import fs2.Stream -import io.circe.Decoder /** * Allows to save and fetch [[GlobalEvent]] s from the database @@ -38,34 +37,12 @@ trait GlobalEventStore[Id, E <: GlobalEvent] { /** * Fetches the history for the global event up to the provided revision */ - def history(id: Id, to: Int): Stream[IO, E] = history(id, Some(to)) + final def history(id: Id, to: Int): Stream[IO, E] = history(id, Some(to)) /** * Fetches the history for the global event up to the last existing revision */ - def history(id: Id): Stream[IO, E] = history(id, None) - - /** - * Fetches events from the given type from the provided offset. - * - * The stream is completed when it reaches the end . - * - * @param offset - * the offset - */ - def currentEvents(offset: Offset): EnvelopeStream[E] - - /** - * Fetches events from the given type from the provided offset - * - * The stream is not completed when it reaches the end of the existing events, but it continues to push new events - * when new events are persisted. - * - * @param offset - * the offset - */ - def events(offset: Offset): EnvelopeStream[E] - + final def history(id: Id): Stream[IO, E] = history(id, None) } object GlobalEventStore { @@ -77,12 +54,9 @@ object GlobalEventStore { xas: Transactors ): GlobalEventStore[Id, E] = new GlobalEventStore[Id, E] { - - import IriInstances._ - implicit val putId: Put[Id] = serializer.putId - implicit val getValue: Get[E] = serializer.getValue - implicit val putValue: Put[E] = serializer.putValue - implicit val decoder: Decoder[E] = serializer.codec + implicit val putId: Put[Id] = serializer.putId + implicit val getValue: Get[E] = serializer.getValue + implicit val putValue: Put[E] = serializer.putValue override def save(event: E): ConnectionIO[Unit] = sql""" @@ -113,22 +87,6 @@ object GlobalEventStore { select.query[E].streamWithChunkSize(config.batchSize).transact(xas.read) } - - private def events(offset: Offset, strategy: RefreshStrategy): Stream[IO, Envelope[E]] = - StreamingQuery[Envelope[E]]( - offset, - offset => sql"""SELECT type, id, value, rev, instant, ordering FROM public.global_events - |${Fragments.whereAndOpt(Some(fr"type = $tpe"), offset.asFragment)} - |ORDER BY ordering - |LIMIT ${config.batchSize}""".stripMargin.query[Envelope[E]], - _.offset, - config.copy(refreshStrategy = strategy), - xas - ) - - override def currentEvents(offset: Offset): Stream[IO, Envelope[E]] = events(offset, RefreshStrategy.Stop) - - override def events(offset: Offset): Stream[IO, Envelope[E]] = events(offset, config.refreshStrategy) } } diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/event/ScopedEventStore.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/event/ScopedEventStore.scala index 63b0787eaa..6c9ac3950e 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/event/ScopedEventStore.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/event/ScopedEventStore.scala @@ -4,16 +4,14 @@ import cats.effect.IO import cats.syntax.all._ import ch.epfl.bluebrain.nexus.delta.sourcing.config.QueryConfig import ch.epfl.bluebrain.nexus.delta.sourcing.event.Event.ScopedEvent -import ch.epfl.bluebrain.nexus.delta.sourcing.implicits._ -import ch.epfl.bluebrain.nexus.delta.sourcing.model.{EntityType, Envelope, EnvelopeStream, ProjectRef} -import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset -import ch.epfl.bluebrain.nexus.delta.sourcing.query.{RefreshStrategy, StreamingQuery} -import ch.epfl.bluebrain.nexus.delta.sourcing.{Execute, PartitionInit, Scope, Serializer, Transactors} +import ch.epfl.bluebrain.nexus.delta.sourcing.model.{EntityType, ProjectRef} +import ch.epfl.bluebrain.nexus.delta.sourcing.{Execute, PartitionInit, Serializer, Transactors} import doobie._ +import ch.epfl.bluebrain.nexus.delta.sourcing.implicits._ + import doobie.implicits._ import doobie.postgres.implicits._ import fs2.Stream -import io.circe.Decoder /** * A @@ -51,25 +49,6 @@ trait ScopedEventStore[Id, E <: ScopedEvent] { * Fetches the history for the global event up to the last existing revision */ def history(ref: ProjectRef, id: Id): Stream[IO, E] = history(ref, id, None) - - /** - * Allow to stream all current events within [[Envelope]] s - * @param scope - * to filter returned events - * @param offset - * offset to start from - */ - def currentEvents(scope: Scope, offset: Offset): EnvelopeStream[E] - - /** - * Allow to stream all current events within [[Envelope]] s - * @param scope - * to filter returned events - * @param offset - * offset to start from - */ - def events(scope: Scope, offset: Offset): EnvelopeStream[E] - } object ScopedEventStore { @@ -81,12 +60,9 @@ object ScopedEventStore { xas: Transactors ): ScopedEventStore[Id, E] = new ScopedEventStore[Id, E] { - - import IriInstances._ - implicit val putId: Put[Id] = serializer.putId - implicit val getValue: Get[E] = serializer.getValue - implicit val putValue: Put[E] = serializer.putValue - implicit val decoder: Decoder[E] = serializer.codec + implicit val putId: Put[Id] = serializer.putId + implicit val getValue: Get[E] = serializer.getValue + implicit val putValue: Put[E] = serializer.putValue private def insertEvent(event: E) = sql""" @@ -127,28 +103,5 @@ object ScopedEventStore { select.query[E].streamWithChunkSize(config.batchSize).transact(xas.read) } - - private def events( - scope: Scope, - offset: Offset, - strategy: RefreshStrategy - ): EnvelopeStream[E] = - StreamingQuery[Envelope[E]]( - offset, - offset => sql"""SELECT type, id, value, rev, instant, ordering FROM public.scoped_events - |${Fragments.whereAndOpt(Some(fr"type = $tpe"), scope.asFragment, offset.asFragment)} - |ORDER BY ordering - |LIMIT ${config.batchSize}""".stripMargin.query[Envelope[E]], - _.offset, - config.copy(refreshStrategy = strategy), - xas - ) - - override def currentEvents(scope: Scope, offset: Offset): EnvelopeStream[E] = - events(scope, offset, RefreshStrategy.Stop) - - override def events(scope: Scope, offset: Offset): EnvelopeStream[E] = - events(scope, offset, config.refreshStrategy) - } } diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/model/Envelope.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/model/Envelope.scala deleted file mode 100644 index c7ab5c6356..0000000000 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/model/Envelope.scala +++ /dev/null @@ -1,115 +0,0 @@ -package ch.epfl.bluebrain.nexus.delta.sourcing.model - -import cats.effect.IO -import ch.epfl.bluebrain.nexus.delta.kernel.utils.ClassUtils -import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri -import ch.epfl.bluebrain.nexus.delta.sourcing.{MultiDecoder, Transactors} -import ch.epfl.bluebrain.nexus.delta.sourcing.implicits._ -import ch.epfl.bluebrain.nexus.delta.sourcing.config.QueryConfig -import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset -import ch.epfl.bluebrain.nexus.delta.sourcing.query.StreamingQuery -import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem -import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem.SuccessElem -import doobie._ -import doobie.postgres.implicits._ -import doobie.util.query.Query0 -import io.circe.{Decoder, Json} - -import java.time.Instant -import scala.annotation.nowarn - -/** - * Envelope adding metadata along the event/state - * @param tpe - * the entity type - * @param id - * the identifier - * @param rev - * the revision - * @param value - * the event/state - * @param instant - * the instant - * @param offset - * the offset - */ -final case class Envelope[+Value](tpe: EntityType, id: Iri, rev: Int, value: Value, instant: Instant, offset: Offset) { - - def valueClass: String = ClassUtils.simpleName(value) - - /** - * Translates the envelope for an elem - * @param project - * how to extract the project reference from the value - */ - def toElem(project: Value => Option[ProjectRef]): Elem[Value] = - SuccessElem(tpe, id, project(value), instant, offset, value, rev) - -} - -object Envelope { - - @nowarn("cat=unused") - implicit def envelopeRead[Value](implicit s: Decoder[Value]): Read[Envelope[Value]] = { - implicit val v: Get[Value] = pgDecoderGetT[Value] - Read[(EntityType, Iri, Value, Int, Instant, Long)].map { case (tpe, id, value, rev, instant, offset) => - Envelope(tpe, id, rev, value, instant, Offset.at(offset)) - } - } - - /** - * Stream results for the provided query from the start offset. The refresh strategy in the query configuration - * defines if the stream will re-execute the query with a delay after all the results have been consumed. Failure to - * decode a stream element (from json to A) will drop the element silently. - * - * @param start - * the start offset - * @param query - * the query function for an offset - * @param xas - * the transactor instances - * @param cfg - * the query configuration - * @param md - * a decoder collection indexed on the entity type for values of type A. - * @tparam A - * the underlying value type - */ - def streamA[A]( - start: Offset, - query: Offset => Query0[Envelope[Json]], - xas: Transactors, - cfg: QueryConfig - )(implicit md: MultiDecoder[A]): EnvelopeStream[A] = - streamFA(start, query, xas, cfg, (tpe, json) => IO.pure(md.decodeJson(tpe, json).toOption)) - - /** - * Stream results for the provided query from the start offset. The refresh strategy in the query configuration - * defines if the stream will re-execute the query with a delay after all the results have been consumed. - * - * @param start - * the start offset - * @param query - * the query function for an offset - * @param xas - * the transactor instances - * @param cfg - * the query configuration - * @param decode - * a decode function - * @tparam A - * the underlying value type - */ - def streamFA[A]( - start: Offset, - query: Offset => Query0[Envelope[Json]], - xas: Transactors, - cfg: QueryConfig, - decode: (EntityType, Json) => IO[Option[A]] - ): EnvelopeStream[A] = - StreamingQuery[Envelope[Json]](start, query, _.offset, cfg, xas) - // evalMapFilter re-chunks to 1, the following 2 statements do the same but preserve the chunks - .evalMapChunk(e => decode(e.tpe, e.value).map(_.map(a => e.copy(value = a)))) - .collect { case Some(e) => e } - -} diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/model/package.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/model/package.scala index eb6d38d80b..e13ec6b3ce 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/model/package.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/model/package.scala @@ -6,7 +6,7 @@ import fs2.{Pipe, Stream} package object model { - type EnvelopeStream[Value] = Stream[IO, Envelope[Value]] + type SuccessElemStream[Value] = Stream[IO, Elem.SuccessElem[Value]] type ElemStream[Value] = Stream[IO, Elem[Value]] diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/state/GlobalStateStore.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/state/GlobalStateStore.scala index 302990f760..d15fb52ff4 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/state/GlobalStateStore.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/state/GlobalStateStore.scala @@ -6,7 +6,7 @@ import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri import ch.epfl.bluebrain.nexus.delta.sourcing.{Serializer, Transactors} import ch.epfl.bluebrain.nexus.delta.sourcing.config.QueryConfig import ch.epfl.bluebrain.nexus.delta.sourcing.implicits._ -import ch.epfl.bluebrain.nexus.delta.sourcing.model.{EntityType, Envelope, EnvelopeStream} +import ch.epfl.bluebrain.nexus.delta.sourcing.model.EntityType import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import ch.epfl.bluebrain.nexus.delta.sourcing.query.{RefreshStrategy, StreamingQuery} import ch.epfl.bluebrain.nexus.delta.sourcing.state.State.GlobalState @@ -15,7 +15,6 @@ import doobie.implicits._ import doobie.postgres.implicits._ import doobie.util.transactor.Transactor import fs2.Stream -import io.circe.Decoder /** * Allow to save and fetch [[GlobalState]] s from the database @@ -45,19 +44,7 @@ trait GlobalStateStore[Id, S <: GlobalState] { * @param offset * the offset */ - def currentStates(offset: Offset): EnvelopeStream[S] - - /** - * Fetches states from the given type from the provided offset - * - * The stream is not completed when it reaches the end of the existing events, but it continues to push new events - * when new events are persisted. - * - * @param offset - * the offset - */ - def states(offset: Offset): EnvelopeStream[S] - + def currentStates(offset: Offset): Stream[IO, S] } object GlobalStateStore { @@ -73,10 +60,9 @@ object GlobalStateStore { ): GlobalStateStore[Id, S] = new GlobalStateStore[Id, S] { import IriInstances._ - implicit val putId: Put[Id] = serializer.putId - implicit val getValue: Get[S] = serializer.getValue - implicit val putValue: Put[S] = serializer.putValue - implicit val decoder: Decoder[S] = serializer.codec + implicit val putId: Put[Id] = serializer.putId + implicit val getValue: Get[S] = serializer.getValue + implicit val putValue: Put[S] = serializer.putValue override def save(state: S): ConnectionIO[Unit] = { sql"SELECT 1 FROM global_states WHERE type = $tpe AND id = ${state.id}" @@ -122,21 +108,19 @@ object GlobalStateStore { .option .transact(xas.read) - private def states(offset: Offset, strategy: RefreshStrategy): EnvelopeStream[S] = - StreamingQuery[Envelope[S]]( + private def states(offset: Offset, strategy: RefreshStrategy): Stream[IO, S] = + StreamingQuery[(S, Long)]( offset, - offset => sql"""SELECT type, id, value, rev, instant, ordering FROM public.global_states + offset => sql"""SELECT value, ordering FROM public.global_states |${Fragments.whereAndOpt(Some(fr"type = $tpe"), offset.asFragment)} |ORDER BY ordering - |LIMIT ${config.batchSize}""".stripMargin.query[Envelope[S]], - _.offset, + |LIMIT ${config.batchSize}""".stripMargin.query[(S, Long)], + { case (_, offset: Long) => Offset.at(offset) }, config.copy(refreshStrategy = strategy), xas - ) - - override def currentStates(offset: Offset): EnvelopeStream[S] = states(offset, RefreshStrategy.Stop) + ).map { case (value, _) => value } - override def states(offset: Offset): EnvelopeStream[S] = states(offset, config.refreshStrategy) + override def currentStates(offset: Offset): Stream[IO, S] = states(offset, RefreshStrategy.Stop) } } diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/state/ScopedStateStore.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/state/ScopedStateStore.scala index 6d7d61079d..d019bfd974 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/state/ScopedStateStore.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/state/ScopedStateStore.scala @@ -13,6 +13,7 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import ch.epfl.bluebrain.nexus.delta.sourcing.query.{RefreshStrategy, StreamingQuery} import ch.epfl.bluebrain.nexus.delta.sourcing.state.ScopedStateStore.StateNotFound.{TagNotFound, UnknownState} import ch.epfl.bluebrain.nexus.delta.sourcing.state.State.ScopedState +import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem import ch.epfl.bluebrain.nexus.delta.sourcing.{Execute, PartitionInit, Scope, Serializer, Transactors} import doobie._ import doobie.implicits._ @@ -69,28 +70,6 @@ trait ScopedStateStore[Id, S <: ScopedState] { */ def get(ref: ProjectRef, id: Id, tag: Tag): IO[S] - /** - * Fetches latest states from the given type from the beginning. - * - * The stream is completed when it reaches the end. - * @param scope - * to filter returned states - */ - def currentStates(scope: Scope): EnvelopeStream[S] = - currentStates(scope, Offset.Start) - - /** - * Fetches states from the given type with the given tag from the beginning. - * - * The stream is completed when it reaches the end. - * @param scope - * to filter returned states - * @param tag - * only states with this tag will be selected - */ - def currentStates(scope: Scope, tag: Tag): EnvelopeStream[S] = - currentStates(scope, tag, Offset.Start) - /** * Fetches latest states from the given type from the provided offset. * @@ -100,7 +79,7 @@ trait ScopedStateStore[Id, S <: ScopedState] { * @param offset * the offset */ - def currentStates(scope: Scope, offset: Offset): EnvelopeStream[S] = + final def currentStates(scope: Scope, offset: Offset): SuccessElemStream[S] = currentStates(scope, Latest, offset) /** @@ -114,32 +93,7 @@ trait ScopedStateStore[Id, S <: ScopedState] { * @param offset * the offset */ - def currentStates(scope: Scope, tag: Tag, offset: Offset): EnvelopeStream[S] - - /** - * Fetches latest states from the given type from the beginning - * - * The stream is not completed when it reaches the end of the existing events, but it continues to push new events - * when new events are persisted. - * - * @param scope - * to filter returned states - */ - def states(scope: Scope): EnvelopeStream[S] = - states(scope, Latest, Offset.Start) - - /** - * Fetches states from the given type with the given tag from the beginning - * - * The stream is not completed when it reaches the end of the existing events, but it continues to push new events - * when new states are persisted. - * - * @param scope - * to filter returned states - * @param tag - * only states with this tag will be selected - */ - def states(scope: Scope, tag: Tag): EnvelopeStream[S] = states(scope, tag, Offset.Start) + def currentStates(scope: Scope, tag: Tag, offset: Offset): SuccessElemStream[S] /** * Fetches latest states from the given type from the provided offset @@ -152,7 +106,7 @@ trait ScopedStateStore[Id, S <: ScopedState] { * @param offset * the offset */ - def states(scope: Scope, offset: Offset): EnvelopeStream[S] = + final def states(scope: Scope, offset: Offset): SuccessElemStream[S] = states(scope, Latest, offset) /** @@ -168,7 +122,7 @@ trait ScopedStateStore[Id, S <: ScopedState] { * @param offset * the offset */ - def states(scope: Scope, tag: Tag, offset: Offset): EnvelopeStream[S] + def states(scope: Scope, tag: Tag, offset: Offset): SuccessElemStream[S] } @@ -278,24 +232,24 @@ object ScopedStateStore { tag: Tag, offset: Offset, strategy: RefreshStrategy - ): EnvelopeStream[S] = - StreamingQuery[Envelope[S]]( + ): SuccessElemStream[S] = + StreamingQuery[Elem.SuccessElem[S]]( offset, offset => // format: off - sql"""SELECT type, id, value, rev, instant, ordering FROM public.scoped_states + sql"""SELECT type, id, value, rev, instant, ordering, org, project FROM public.scoped_states |${Fragments.whereAndOpt(Some(fr"type = $tpe"), scope.asFragment, Some(fr"tag = $tag"), offset.asFragment)} |ORDER BY ordering - |LIMIT ${config.batchSize}""".stripMargin.query[Envelope[S]], + |LIMIT ${config.batchSize}""".stripMargin.query[Elem.SuccessElem[S]], _.offset, config.copy(refreshStrategy = strategy), xas ) - override def currentStates(scope: Scope, tag: Tag, offset: Offset): EnvelopeStream[S] = + override def currentStates(scope: Scope, tag: Tag, offset: Offset): SuccessElemStream[S] = states(scope, tag, offset, RefreshStrategy.Stop) - override def states(scope: Scope, tag: Tag, offset: Offset): EnvelopeStream[S] = + override def states(scope: Scope, tag: Tag, offset: Offset): SuccessElemStream[S] = states(scope, tag, offset, config.refreshStrategy) } diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/Elem.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/Elem.scala index d736a60309..8386f125e7 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/Elem.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/Elem.scala @@ -1,12 +1,14 @@ package ch.epfl.bluebrain.nexus.delta.sourcing.stream import cats.effect.IO +import cats.implicits.{toFoldableOps, toFunctorOps, toTraverseOps} import cats.{Applicative, Eval, Traverse} import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.JsonLdContext.keywords import ch.epfl.bluebrain.nexus.delta.sourcing.model.{EntityType, ProjectRef} import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem.{DroppedElem, FailedElem, SuccessElem} +import doobie.Read import io.circe.{Decoder, Encoder} import io.circe.generic.extras.Configuration import io.circe.generic.extras.semiauto.{deriveConfiguredDecoder, deriveConfiguredEncoder} @@ -91,9 +93,9 @@ sealed trait Elem[+A] extends Product with Serializable { * the mapping function */ def map[B](f: A => B): Elem[B] = this match { - case e: SuccessElem[A] => e.copy(value = f(e.value)) - case e: FailedElem => e - case e: DroppedElem => e + case s: SuccessElem[A] => s.mapValue(f) + case f: FailedElem => f + case d: DroppedElem => d } /** @@ -231,7 +233,34 @@ object Elem { offset: Offset, value: A, rev: Int - ) extends Elem[A] + ) extends Elem[A] { + def mapValue[B](f: A => B): Elem.SuccessElem[B] = copy(value = f(value)) + + def withProject(project: ProjectRef): Elem.SuccessElem[A] = this.copy(project = Some(project)) + } + + object SuccessElem { + @nowarn("cat=unused") + implicit def read[Value](implicit s: Decoder[Value]): Read[SuccessElem[Value]] = { + import doobie._ + import doobie.postgres.implicits._ + import ch.epfl.bluebrain.nexus.delta.sourcing.implicits._ + implicit val v: Get[Value] = pgDecoderGetT[Value] + Read[(EntityType, Iri, Value, Int, Instant, Long, String, String)].map { + case (tpe, id, value, rev, instant, offset, org, proj) => + SuccessElem(tpe, id, Some(ProjectRef.unsafe(org, proj)), instant, Offset.at(offset), value, rev) + } + } + + implicit val traverse: Traverse[SuccessElem] = new Traverse[SuccessElem] { + override def traverse[G[_]: Applicative, A, B](s: SuccessElem[A])(f: A => G[B]): G[SuccessElem[B]] = + Applicative[G].map(f(s.value))(v => s.copy(value = v)) + + override def foldLeft[A, B](s: SuccessElem[A], b: B)(f: (B, A) => B): B = f(b, s.value) + + override def foldRight[A, B](s: SuccessElem[A], lb: Eval[B])(f: (A, Eval[B]) => Eval[B]): Eval[B] = f(s.value, lb) + } + } /** * An element that has suffered a processing failure. @@ -279,20 +308,20 @@ object Elem { implicit val traverseElem: Traverse[Elem] = new Traverse[Elem] { override def traverse[G[_]: Applicative, A, B](fa: Elem[A])(f: A => G[B]): G[Elem[B]] = fa match { - case s: SuccessElem[A] => Applicative[G].map(f(s.value))(s.success) + case s: SuccessElem[A] => s.traverse(f).widen[Elem[B]] case dropped: DroppedElem => Applicative[G].pure(dropped) case failed: FailedElem => Applicative[G].pure(failed) } override def foldLeft[A, B](fa: Elem[A], b: B)(f: (B, A) => B): B = fa match { - case s: SuccessElem[A] => f(b, s.value) + case s: SuccessElem[A] => s.foldLeft(b)(f) case _ => b } override def foldRight[A, B](fa: Elem[A], lb: Eval[B])(f: (A, Eval[B]) => Eval[B]): Eval[B] = fa match { - case s: SuccessElem[A] => f(s.value, lb) + case s: SuccessElem[A] => s.foldRight(lb)(f) case _ => lb } } diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/Operation.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/Operation.scala index 4650e1703d..97c7def981 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/Operation.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/Operation.scala @@ -221,11 +221,11 @@ object Operation { def apply(element: SuccessElem[In]): IO[Elem[Out]] /** - * Checks if the provided envelope has a successful element value of type `I`. If true, it will return it in Right. + * Checks if the provided elem has a successful element value of type `I`. If true, it will return it in Right. * Otherwise it will return it in Left with the type `O`. This is safe because [[Elem]] is covariant. * * @param element - * an envelope with an Elem to be tested + * an elem with an Elem to be tested */ protected def partitionSuccess[I, O](element: Elem[I]): Either[Elem[O], SuccessElem[I]] = element match { diff --git a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/GlobalEventLogSuite.scala b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/GlobalEventLogSuite.scala index 48f1da7dd0..bd3e73d322 100644 --- a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/GlobalEventLogSuite.scala +++ b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/GlobalEventLogSuite.scala @@ -9,7 +9,6 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.Arithmetic.{ArithmeticCommand, Ari import ch.epfl.bluebrain.nexus.delta.sourcing.EvaluationError.{EvaluationFailure, EvaluationTimeout} import ch.epfl.bluebrain.nexus.delta.sourcing.config.QueryConfig import ch.epfl.bluebrain.nexus.delta.sourcing.event.GlobalEventStore -import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import ch.epfl.bluebrain.nexus.delta.sourcing.postgres.Doobie import ch.epfl.bluebrain.nexus.delta.sourcing.query.RefreshStrategy import ch.epfl.bluebrain.nexus.delta.sourcing.state.GlobalStateStore @@ -127,12 +126,4 @@ class GlobalEventLogSuite extends NexusSuite with Doobie.Fixture { eventLog.stateOr(id, 10, NotFound, RevisionNotFound).interceptEquals(RevisionNotFound(10, 2)) } - test(s"Delete events and state for $id") { - for { - _ <- eventLog.delete(id) - _ <- eventLog.stateOr(id, 1, NotFound, RevisionNotFound).interceptEquals(NotFound) - _ <- eventLog.currentEvents(Offset.start).assertSize(0) - } yield () - } - } diff --git a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/event/GlobalEventStoreSuite.scala b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/event/GlobalEventStoreSuite.scala index cd22ed72e4..236a499c3e 100644 --- a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/event/GlobalEventStoreSuite.scala +++ b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/event/GlobalEventStoreSuite.scala @@ -8,8 +8,7 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.Arithmetic.ArithmeticEvent import ch.epfl.bluebrain.nexus.delta.sourcing.Arithmetic.ArithmeticEvent.{Minus, Plus} import ch.epfl.bluebrain.nexus.delta.sourcing.config.QueryConfig import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.{Anonymous, User} -import ch.epfl.bluebrain.nexus.delta.sourcing.model.{Envelope, Label} -import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset +import ch.epfl.bluebrain.nexus.delta.sourcing.model.Label import ch.epfl.bluebrain.nexus.delta.sourcing.postgres.Doobie import ch.epfl.bluebrain.nexus.delta.sourcing.query.RefreshStrategy import ch.epfl.bluebrain.nexus.testkit.mu.NexusSuite @@ -41,11 +40,6 @@ class GlobalEventStoreSuite extends NexusSuite with Doobie.Fixture with Doobie.A private val event3 = Plus(id, 3, 4, Instant.EPOCH, alice) private val event4 = Minus(id2, 1, 4, Instant.EPOCH, Anonymous) - private val envelope1 = Envelope(Arithmetic.entityType, id, 1, event1, Instant.EPOCH, Offset.at(1L)) - private val envelope2 = Envelope(Arithmetic.entityType, id, 2, event2, Instant.EPOCH, Offset.at(2L)) - private val envelope3 = Envelope(Arithmetic.entityType, id, 3, event3, Instant.EPOCH, Offset.at(3L)) - private val envelope4 = Envelope(Arithmetic.entityType, id2, 1, event4, Instant.EPOCH, Offset.at(4L)) - private def assertCount = sql"select count(*) from global_events".query[Int].unique.transact(xas.read).assertEquals(4) @@ -76,22 +70,6 @@ class GlobalEventStoreSuite extends NexusSuite with Doobie.Fixture with Doobie.A store.history(nxv + "xxx", 2).assertEmpty } - test("Fetch all current events from the beginning") { - store.currentEvents(Offset.Start).assert(envelope1, envelope2, envelope3, envelope4) - } - - test("Fetch all current events from offset 2") { - store.currentEvents(Offset.at(2L)).assert(envelope3, envelope4) - } - - test("Fetch all events from the beginning") { - store.events(Offset.Start).assert(envelope1, envelope2, envelope3, envelope4) - } - - test("Fetch all events from offset 2") { - store.events(Offset.at(2L)).assert(envelope3, envelope4) - } - test(s"Delete events for $id") { for { _ <- store.delete(id).transact(xas.write) diff --git a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/event/ScopedEventStoreSuite.scala b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/event/ScopedEventStoreSuite.scala index b2483deec4..1b1848fc19 100644 --- a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/event/ScopedEventStoreSuite.scala +++ b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/event/ScopedEventStoreSuite.scala @@ -3,15 +3,14 @@ package ch.epfl.bluebrain.nexus.delta.sourcing.event import cats.syntax.all._ import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.nxv +import ch.epfl.bluebrain.nexus.delta.sourcing.PullRequest import ch.epfl.bluebrain.nexus.delta.sourcing.PullRequest.PullRequestEvent import ch.epfl.bluebrain.nexus.delta.sourcing.PullRequest.PullRequestEvent.{PullRequestCreated, PullRequestMerged, PullRequestUpdated} import ch.epfl.bluebrain.nexus.delta.sourcing.config.QueryConfig import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.{Anonymous, User} -import ch.epfl.bluebrain.nexus.delta.sourcing.model.{Envelope, Label, ProjectRef} -import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset +import ch.epfl.bluebrain.nexus.delta.sourcing.model.{Label, ProjectRef} import ch.epfl.bluebrain.nexus.delta.sourcing.postgres.Doobie import ch.epfl.bluebrain.nexus.delta.sourcing.query.RefreshStrategy -import ch.epfl.bluebrain.nexus.delta.sourcing.{PullRequest, Scope} import ch.epfl.bluebrain.nexus.testkit.mu.NexusSuite import doobie.implicits._ import munit.AnyFixture @@ -50,13 +49,6 @@ class ScopedEventStoreSuite extends NexusSuite with Doobie.Fixture with Doobie.A private val event6 = PullRequestCreated(id3, project3, Instant.EPOCH, Anonymous) - private val envelope1 = Envelope(PullRequest.entityType, id1, 1, event1, Instant.EPOCH, Offset.at(1L)) - private val envelope2 = Envelope(PullRequest.entityType, id1, 2, event2, Instant.EPOCH, Offset.at(2L)) - private val envelope3 = Envelope(PullRequest.entityType, id1, 3, event3, Instant.EPOCH, Offset.at(3L)) - private val envelope4 = Envelope(PullRequest.entityType, id2, 1, event4, Instant.EPOCH, Offset.at(4L)) - private val envelope5 = Envelope(PullRequest.entityType, id1, 1, event5, Instant.EPOCH, Offset.at(5L)) - private val envelope6 = Envelope(PullRequest.entityType, id3, 1, event6, Instant.EPOCH, Offset.at(6L)) - private def assertCount = sql"select count(*) from scoped_events".query[Int].unique.transact(xas.read).assertEquals(6) test("Save events") { @@ -87,31 +79,4 @@ class ScopedEventStoreSuite extends NexusSuite with Doobie.Fixture with Doobie.A test("Get an empty stream for an unknown (project, id)") { store.history(project2, id2, 2).assertEmpty } - - test("Fetch all current events from the beginning") { - store - .currentEvents(Scope.Root, Offset.Start) - .assert(envelope1, envelope2, envelope3, envelope4, envelope5, envelope6) - } - - test("Fetch current events for `org` from offset 2") { - store.currentEvents(Scope.Org(Label.unsafe("org")), Offset.at(2L)).assert(envelope3, envelope4, envelope5) - } - - test("Fetch current events for `proj1` from the beginning") { - store.currentEvents(Scope.Project(project1), Offset.Start).assert(envelope1, envelope2, envelope3, envelope4) - } - - test("Fetch all events from the beginning") { - store.events(Scope.Root, Offset.Start).assert(envelope1, envelope2, envelope3, envelope4, envelope5, envelope6) - } - - test(s"Fetch current events for `${project1.organization}` from offset 2") { - store.events(Scope.Org(project1.organization), Offset.at(2L)).assert(envelope3, envelope4, envelope5) - } - - test(s"Fetch current events for `$project1` from the beginning") { - store.events(Scope.Project(project1), Offset.Start).assert(envelope1, envelope2, envelope3, envelope4) - } - } diff --git a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/state/GlobalStateStoreSuite.scala b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/state/GlobalStateStoreSuite.scala index 501b8b76d5..2c1486a570 100644 --- a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/state/GlobalStateStoreSuite.scala +++ b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/state/GlobalStateStoreSuite.scala @@ -7,7 +7,7 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.Arithmetic import ch.epfl.bluebrain.nexus.delta.sourcing.Arithmetic.Total import ch.epfl.bluebrain.nexus.delta.sourcing.config.QueryConfig import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.{Anonymous, User} -import ch.epfl.bluebrain.nexus.delta.sourcing.model.{Envelope, Label} +import ch.epfl.bluebrain.nexus.delta.sourcing.model.Label import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import ch.epfl.bluebrain.nexus.delta.sourcing.postgres.Doobie import ch.epfl.bluebrain.nexus.delta.sourcing.query.RefreshStrategy @@ -40,10 +40,6 @@ class GlobalStateStoreSuite extends NexusSuite with Doobie.Fixture with Doobie.A private val state2 = Total(id2, 1, 12, Instant.EPOCH, Anonymous, Instant.EPOCH, alice) private val updatedState1 = Total(id1, 2, 42, Instant.EPOCH, Anonymous, Instant.EPOCH, alice) - private val envelope1 = Envelope(Arithmetic.entityType, id1, 1, state1, Instant.EPOCH, Offset.at(1L)) - private val envelope2 = Envelope(Arithmetic.entityType, id2, 1, state2, Instant.EPOCH, Offset.at(2L)) - private val envelope3 = Envelope(Arithmetic.entityType, id1, 2, updatedState1, Instant.EPOCH, Offset.at(3L)) - private def assertCount(expected: Int) = sql"select count(*) from global_states".query[Int].unique.transact(xas.read).assertEquals(expected) @@ -63,19 +59,11 @@ class GlobalStateStoreSuite extends NexusSuite with Doobie.Fixture with Doobie.A } test("Fetch all current states from the beginning") { - store.currentStates(Offset.Start).assert(envelope1, envelope2) + store.currentStates(Offset.Start).assert(state1, state2) } test("Fetch all current states from offset 2") { - store.currentStates(Offset.at(1L)).assert(envelope2) - } - - test("Fetch all states from the beginning") { - store.states(Offset.Start).take(2).assert(envelope1, envelope2) - } - - test("Fetch all states from offset 2") { - store.states(Offset.at(1L)).take(1).assert(envelope2) + store.currentStates(Offset.at(1L)).assert(state2) } test("Update state 1 successfully") { @@ -87,7 +75,7 @@ class GlobalStateStoreSuite extends NexusSuite with Doobie.Fixture with Doobie.A } test("Fetch all current states from the beginning after updating state 1") { - store.currentStates(Offset.Start).assert(envelope2, envelope3) + store.currentStates(Offset.Start).assert(state2, updatedState1) } test("Delete state 2 successfully") { @@ -99,7 +87,7 @@ class GlobalStateStoreSuite extends NexusSuite with Doobie.Fixture with Doobie.A } test("Fetch all current states from the beginning after deleting state 2") { - store.currentStates(Offset.Start).assert(envelope3) + store.currentStates(Offset.Start).assert(updatedState1) } } diff --git a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/state/ScopedStateStoreSuite.scala b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/state/ScopedStateStoreSuite.scala index cc34ebfd05..c0c467248b 100644 --- a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/state/ScopedStateStoreSuite.scala +++ b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/state/ScopedStateStoreSuite.scala @@ -11,11 +11,12 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.config.QueryConfig import ch.epfl.bluebrain.nexus.delta.sourcing.implicits._ import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.{Anonymous, User} import ch.epfl.bluebrain.nexus.delta.sourcing.model.Tag.{Latest, UserTag} -import ch.epfl.bluebrain.nexus.delta.sourcing.model.{Envelope, Label, ProjectRef} +import ch.epfl.bluebrain.nexus.delta.sourcing.model.{Label, ProjectRef} import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import ch.epfl.bluebrain.nexus.delta.sourcing.postgres.Doobie import ch.epfl.bluebrain.nexus.delta.sourcing.query.RefreshStrategy import ch.epfl.bluebrain.nexus.delta.sourcing.state.ScopedStateStore.StateNotFound.{TagNotFound, UnknownState} +import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem import ch.epfl.bluebrain.nexus.delta.sourcing.{EntityCheck, PullRequest, Scope} import ch.epfl.bluebrain.nexus.testkit.mu.NexusSuite import doobie.implicits._ @@ -56,13 +57,20 @@ class ScopedStateStoreSuite extends NexusSuite with Doobie.Fixture with Doobie.A private val state3 = PullRequestActive(id1, project2, 1, Instant.EPOCH, Anonymous, Instant.EPOCH, alice) private val state4 = PullRequestActive(id4, project3, 1, Instant.EPOCH, Anonymous, Instant.EPOCH, alice) - private val envelope1 = Envelope(PullRequest.entityType, id1, 1, state1, Instant.EPOCH, Offset.at(1L)) - private val envelope2 = Envelope(PullRequest.entityType, id2, 1, state2, Instant.EPOCH, Offset.at(2L)) - private val envelope3 = Envelope(PullRequest.entityType, id1, 1, state3, Instant.EPOCH, Offset.at(3L)) - private val envelope4 = Envelope(PullRequest.entityType, id4, 1, state4, Instant.EPOCH, Offset.at(4L)) - private val envelope1Tagged = Envelope(PullRequest.entityType, id1, 1, state1, Instant.EPOCH, Offset.at(5L)) - private val envelope3Tagged = Envelope(PullRequest.entityType, id1, 1, state3, Instant.EPOCH, Offset.at(6L)) - private val envelopeUpdated1 = Envelope(PullRequest.entityType, id1, 2, updatedState1, Instant.EPOCH, Offset.at(7L)) + private val elem1 = + Elem.SuccessElem(PullRequest.entityType, id1, Some(project1), Instant.EPOCH, Offset.at(1L), state1, 1) + private val elem2 = + Elem.SuccessElem(PullRequest.entityType, id2, Some(project1), Instant.EPOCH, Offset.at(2L), state2, 1) + private val elem3 = + Elem.SuccessElem(PullRequest.entityType, id1, Some(project2), Instant.EPOCH, Offset.at(3L), state3, 1) + private val elem4 = + Elem.SuccessElem(PullRequest.entityType, id4, Some(project3), Instant.EPOCH, Offset.at(4L), state4, 1) + private val elem1Tagged = + Elem.SuccessElem(PullRequest.entityType, id1, Some(project1), Instant.EPOCH, Offset.at(5L), state1, 1) + private val elem3Tagged = + Elem.SuccessElem(PullRequest.entityType, id1, Some(project2), Instant.EPOCH, Offset.at(6L), state3, 1) + private val elem1Updated = + Elem.SuccessElem(PullRequest.entityType, id1, Some(project1), Instant.EPOCH, Offset.at(7L), updatedState1, 2) private def assertCount(expected: Int) = sql"select count(*) from scoped_states".query[Int].unique.transact(xas.read).assertEquals(expected) @@ -86,35 +94,35 @@ class ScopedStateStoreSuite extends NexusSuite with Doobie.Fixture with Doobie.A } test("Fetch all current latest states from the beginning") { - store.currentStates(Scope.Root).assert(envelope1, envelope2, envelope3, envelope4) + store.currentStates(Scope.Root, Offset.Start).assert(elem1, elem2, elem3, elem4) } test("Fetch all latest states from the beginning") { - store.states(Scope.Root).assert(envelope1, envelope2, envelope3, envelope4) + store.states(Scope.Root, Offset.Start).assert(elem1, elem2, elem3, elem4) } test(s"Fetch current states for ${project1.organization} from the beginning") { - store.currentStates(Scope.Org(project1.organization)).assert(envelope1, envelope2, envelope3) + store.currentStates(Scope.Org(project1.organization), Offset.Start).assert(elem1, elem2, elem3) } test(s"Fetch states for ${project1.organization} from the beginning") { - store.states(Scope.Org(project1.organization)).assert(envelope1, envelope2, envelope3) + store.states(Scope.Org(project1.organization), Offset.Start).assert(elem1, elem2, elem3) } test(s"Fetch current states for $project1 from offset 2") { - store.currentStates(Scope.Project(project1), Offset.at(1L)).assert(envelope2) + store.currentStates(Scope.Project(project1), Offset.at(1L)).assert(elem2) } test(s"Fetch states for $project1 from offset 2") { - store.states(Scope.Project(project1), Offset.at(1L)).assert(envelope2) + store.states(Scope.Project(project1), Offset.at(1L)).assert(elem2) } test(s"Fetch all current states from the beginning for tag `$customTag`") { - store.currentStates(Scope.Root, customTag).assert(envelope1Tagged, envelope3Tagged) + store.currentStates(Scope.Root, customTag, Offset.Start).assert(elem1Tagged, elem3Tagged) } test(s"Fetch all states from the beginning for tag `$customTag`") { - store.states(Scope.Root, customTag).assert(envelope1Tagged, envelope3Tagged) + store.states(Scope.Root, customTag, Offset.Start).assert(elem1Tagged, elem3Tagged) } test("Update state 1 successfully") { @@ -126,7 +134,7 @@ class ScopedStateStoreSuite extends NexusSuite with Doobie.Fixture with Doobie.A } test("Fetch all current latest states from the beginning") { - store.currentStates(Scope.Root).assert(envelope2, envelope3, envelope4, envelopeUpdated1) + store.currentStates(Scope.Root, Offset.Start).assert(elem2, elem3, elem4, elem1Updated) } test("Delete tagged state 3 successfully") { @@ -138,7 +146,7 @@ class ScopedStateStoreSuite extends NexusSuite with Doobie.Fixture with Doobie.A } test(s"Fetch all states from the beginning for tag `$customTag` after deletion of `state3`") { - store.states(Scope.Root, customTag).assert(envelope1Tagged) + store.states(Scope.Root, customTag, Offset.Start).assert(elem1Tagged) } test("Check that the given ids does exist") {