Skip to content

Commit

Permalink
Replace Envelope with Elem.SuccessElem (#4629)
Browse files Browse the repository at this point in the history
* Replace Envelope with Elem.SuccessElem

* Remove event streaming from global tables

* Separate global/scoped event streaming, remove unused methods

* Remove more unused streaming methods

* Make successful streams of elements explicit in type

* Fix coordinator tests

* Renames

* Remove GlobalStateValue, parse from DB directly
  • Loading branch information
dantb authored Jan 12, 2024
1 parent 6f03f6d commit 783ec49
Show file tree
Hide file tree
Showing 41 changed files with 333 additions and 787 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ object BlazegraphDeletionTask {

def apply(views: BlazegraphViews) =
new BlazegraphDeletionTask(
project => views.currentIndexingViews(project).evalMapFilter(_.toIO),
project => views.currentIndexingViews(project).map(_.value),
(v: ActiveViewDef, subject: Subject) =>
views.internalDeprecate(v.ref.viewId, v.ref.project, v.rev)(subject).handleErrorWith { r =>
logger.error(s"Deprecating '$v' resulted in error: '$r'.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.indexing.{BlazegraphSink
import ch.epfl.bluebrain.nexus.delta.sdk.IndexingAction
import ch.epfl.bluebrain.nexus.delta.sdk.model.BaseUri
import ch.epfl.bluebrain.nexus.delta.sourcing.config.BatchConfig
import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ElemStream, ProjectRef, Tag}
import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ElemStream, ProjectRef, SuccessElemStream, Tag}
import ch.epfl.bluebrain.nexus.delta.sourcing.state.GraphResource
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Operation.Sink
import ch.epfl.bluebrain.nexus.delta.sourcing.stream._
Expand All @@ -27,7 +27,7 @@ import scala.concurrent.duration.FiniteDuration
* a maximum duration for the indexing
*/
final class BlazegraphIndexingAction(
fetchCurrentViews: ProjectRef => ElemStream[IndexingViewDef],
fetchCurrentViews: ProjectRef => SuccessElemStream[IndexingViewDef],
compilePipeChain: PipeChain => Either[ProjectionErr, Operation],
sink: ActiveViewDef => Sink,
override val timeout: FiniteDuration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Subject
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Tag.UserTag
import ch.epfl.bluebrain.nexus.delta.sourcing.model._
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem
import io.circe.Json

import java.util.UUID
Expand Down Expand Up @@ -318,29 +319,21 @@ final class BlazegraphViews(
/**
* Return the existing indexing views in a project in a finite stream
*/
def currentIndexingViews(project: ProjectRef): ElemStream[IndexingViewDef] =
log.currentStates(Scope.Project(project)).evalMapFilter { envelope =>
IO.pure(toIndexViewDef(envelope))
}

/**
* Return all existing indexing views in a finite stream
*/
def currentIndexingViews: ElemStream[IndexingViewDef] =
log.currentStates(Scope.Root).evalMapFilter { envelope =>
IO.pure(toIndexViewDef(envelope))
def currentIndexingViews(project: ProjectRef): SuccessElemStream[IndexingViewDef] =
log.currentStates(Scope.Project(project)).evalMapFilter { elem =>
IO.pure(toIndexViewDef(elem))
}

/**
* Return the indexing views in a non-ending stream
*/
def indexingViews(start: Offset): ElemStream[IndexingViewDef] =
log.states(Scope.Root, start).evalMapFilter { envelope =>
IO.pure(toIndexViewDef(envelope))
def indexingViews(start: Offset): SuccessElemStream[IndexingViewDef] =
log.states(Scope.Root, start).evalMapFilter { elem =>
IO.pure(toIndexViewDef(elem))
}

private def toIndexViewDef(envelope: Envelope[BlazegraphViewState]) =
envelope.toElem { v => Some(v.project) }.traverse { v =>
private def toIndexViewDef(elem: Elem.SuccessElem[BlazegraphViewState]) =
elem.withProject(elem.value.project).traverse { v =>
IndexingViewDef(v, prefix)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.indexing.IndexingViewDef
import ch.epfl.bluebrain.nexus.delta.sdk.model.BaseUri
import ch.epfl.bluebrain.nexus.delta.sdk.stream.GraphResourceStream
import ch.epfl.bluebrain.nexus.delta.sdk.views.ViewRef
import ch.epfl.bluebrain.nexus.delta.sourcing.model.ElemStream
import ch.epfl.bluebrain.nexus.delta.sourcing.model.SuccessElemStream
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Operation.Sink
import ch.epfl.bluebrain.nexus.delta.sourcing.stream._
Expand Down Expand Up @@ -41,7 +41,7 @@ object BlazegraphCoordinator {
* the general supervisor
*/
final private class Active(
fetchViews: Offset => ElemStream[IndexingViewDef],
fetchViews: Offset => SuccessElemStream[IndexingViewDef],
graphStream: GraphResourceStream,
compilePipeChain: PipeChain => Either[ProjectionErr, Operation],
cache: LocalCache[ViewRef, ActiveViewDef],
Expand Down Expand Up @@ -145,7 +145,7 @@ object BlazegraphCoordinator {
}

def apply(
fetchViews: Offset => ElemStream[IndexingViewDef],
fetchViews: Offset => SuccessElemStream[IndexingViewDef],
graphStream: GraphResourceStream,
compilePipeChain: PipeChain => Either[ProjectionErr, Operation],
supervisor: Supervisor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.PullRequest.PullRequestState
import ch.epfl.bluebrain.nexus.delta.sourcing.PullRequest.PullRequestState.PullRequestActive
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Anonymous
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Tag.UserTag
import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ElemStream, ProjectRef}
import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ProjectRef, SuccessElemStream}
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
import ch.epfl.bluebrain.nexus.delta.sourcing.query.SelectFilter
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem.{DroppedElem, FailedElem, SuccessElem}
Expand Down Expand Up @@ -75,9 +75,7 @@ class BlazegraphIndexingActionSuite extends NexusSuite with Fixtures {
ViewRef(project, id4)
)

private val id5 = nxv + "view5"

private def viewStream: ElemStream[IndexingViewDef] =
private def viewStream: SuccessElemStream[IndexingViewDef] =
Stream(
SuccessElem(
tpe = BlazegraphViews.entityType,
Expand Down Expand Up @@ -114,14 +112,6 @@ class BlazegraphIndexingActionSuite extends NexusSuite with Fixtures {
offset = Offset.at(4L),
value = view4,
rev = 1
),
DroppedElem(
tpe = BlazegraphViews.entityType,
id = id5,
project = Some(project),
Instant.EPOCH,
Offset.at(5L),
rev = 1
)
)

Expand Down Expand Up @@ -159,7 +149,7 @@ class BlazegraphIndexingActionSuite extends NexusSuite with Fixtures {
)

test("Collect only the adequate views") {
val expected = IdAcc(Set(id1), Set(id2, id4, id5), Set(id3))
val expected = IdAcc(Set(id1), Set(id2, id4), Set(id3))

indexingAction
.projections(project, elem)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.ExpandedJsonLd
import ch.epfl.bluebrain.nexus.delta.sdk.stream.GraphResourceStream
import ch.epfl.bluebrain.nexus.delta.sdk.views.ViewRef
import ch.epfl.bluebrain.nexus.delta.sourcing.PullRequest
import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ElemStream, ProjectRef}
import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ProjectRef, SuccessElemStream}
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
import ch.epfl.bluebrain.nexus.delta.sourcing.query.SelectFilter
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem.{DroppedElem, FailedElem, SuccessElem}
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem.SuccessElem
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.ProjectionErr.CouldNotFindPipeErr
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.SupervisorSetup.unapply
import ch.epfl.bluebrain.nexus.delta.sourcing.stream._
Expand Down Expand Up @@ -65,7 +65,7 @@ class BlazegraphCoordinatorSuite extends NexusSuite with SupervisorSetup.Fixture
ViewRef(project, id3),
projection = id3.toString,
SelectFilter.latest,
Some(PipeChain(PipeRef.unsafe("xxx") -> ExpandedJsonLd.empty)),
Some(PipeChain(unknownPipe -> ExpandedJsonLd.empty)),
namespace = "view3",
indexingRev,
rev
Expand All @@ -86,8 +86,8 @@ class BlazegraphCoordinatorSuite extends NexusSuite with SupervisorSetup.Fixture

private val resumeSignal = SignallingRef[IO, Boolean](false).unsafeRunSync()

// Streams 4 elements until signal is set to true and then a failed item, 1 updated view and 1 deprecated view
private def viewStream: ElemStream[IndexingViewDef] =
// Streams 3 elements until signal is set to true, then 1 updated view and 1 deprecated view
private def viewStream: SuccessElemStream[IndexingViewDef] =
Stream(
SuccessElem(
tpe = BlazegraphViews.entityType,
Expand All @@ -98,20 +98,12 @@ class BlazegraphCoordinatorSuite extends NexusSuite with SupervisorSetup.Fixture
value = view1,
rev = 1
),
DroppedElem(
tpe = BlazegraphViews.entityType,
id = nxv + "dropped",
project = Some(project),
Instant.EPOCH,
Offset.at(2L),
rev = 1
),
SuccessElem(
tpe = BlazegraphViews.entityType,
id = view2.ref.viewId,
project = Some(project),
instant = Instant.EPOCH,
offset = Offset.at(3L),
offset = Offset.at(2L),
value = view2,
rev = 1
),
Expand All @@ -120,26 +112,17 @@ class BlazegraphCoordinatorSuite extends NexusSuite with SupervisorSetup.Fixture
id = view3.ref.viewId,
project = Some(project),
instant = Instant.EPOCH,
offset = Offset.at(4L),
offset = Offset.at(3L),
value = view3,
rev = 1
)
) ++ Stream.never[IO].interruptWhen(resumeSignal) ++ Stream(
FailedElem(
tpe = BlazegraphViews.entityType,
id = nxv + "failed_coord",
project = Some(project),
Instant.EPOCH,
Offset.at(5L),
new IllegalStateException("Something got wrong :("),
rev = 1
),
SuccessElem(
tpe = BlazegraphViews.entityType,
id = deprecatedView1.ref.viewId,
project = Some(project),
instant = Instant.EPOCH,
offset = Offset.at(6L),
offset = Offset.at(4L),
value = deprecatedView1,
rev = 1
),
Expand All @@ -148,17 +131,17 @@ class BlazegraphCoordinatorSuite extends NexusSuite with SupervisorSetup.Fixture
id = updatedView2.ref.viewId,
project = Some(project),
instant = Instant.EPOCH,
offset = Offset.at(7L),
offset = Offset.at(5L),
value = updatedView2,
rev = 1
),
// Elem at offset 8 represents a view update that does not require reindexing
// Elem at offset 6 represents a view update that does not require reindexing
SuccessElem(
tpe = BlazegraphViews.entityType,
id = updatedView2.ref.viewId,
project = Some(project),
instant = Instant.EPOCH,
offset = Offset.at(8L),
offset = Offset.at(6L),
value = updatedView2,
rev = 1
)
Expand Down Expand Up @@ -187,7 +170,7 @@ class BlazegraphCoordinatorSuite extends NexusSuite with SupervisorSetup.Fixture
)
_ <- sv.describe(BlazegraphCoordinator.metadata.name)
.map(_.map(_.progress))
.assertEquals(Some(ProjectionProgress(Offset.at(4L), Instant.EPOCH, 4, 1, 1)))
.assertEquals(Some(ProjectionProgress(Offset.at(3L), Instant.EPOCH, 3, 0, 1)))
.eventually
} yield ()
}
Expand Down Expand Up @@ -268,7 +251,7 @@ class BlazegraphCoordinatorSuite extends NexusSuite with SupervisorSetup.Fixture
_ <- resumeSignal.set(true)
_ <- sv.describe(BlazegraphCoordinator.metadata.name)
.map(_.map(_.progress))
.assertEquals(Some(ProjectionProgress(Offset.at(8L), Instant.EPOCH, 8, 1, 2)))
.assertEquals(Some(ProjectionProgress(Offset.at(6L), Instant.EPOCH, 6, 0, 1)))
.eventually
} yield ()
}
Expand Down Expand Up @@ -311,17 +294,9 @@ class BlazegraphCoordinatorSuite extends NexusSuite with SupervisorSetup.Fixture
} yield ()
}

test("Coordinator projection should have one error after failed elem offset 3") {
for {
entries <- projectionErrors.failedElemEntries(BlazegraphCoordinator.metadata.name, Offset.At(3L)).compile.toList
r = entries.assertOneElem
_ = assertEquals(r.failedElemData.id, nxv + "failed_coord")
} yield ()
}

test("View 2_2 projection should have one error after failed elem offset 3") {
test("View 2_2 projection should have one error after failed elem offset 2") {
for {
entries <- projectionErrors.failedElemEntries(updatedView2.projection, Offset.At(3L)).compile.toList
entries <- projectionErrors.failedElemEntries(updatedView2.projection, Offset.At(2L)).compile.toList
r = entries.assertOneElem
_ = assertEquals(r.failedElemData.id, nxv + "failed")
} yield ()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Subject
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Tag.UserTag
import ch.epfl.bluebrain.nexus.delta.sourcing.model._
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem
import io.circe.Json

/**
Expand Down Expand Up @@ -332,20 +333,14 @@ final class CompositeViews private (
def currentViews(project: ProjectRef): ElemStream[CompositeViewDef] =
log.currentStates(Scope.Project(project)).map(toCompositeViewDef)

/**
* Return all existing indexing views in a finite stream
*/
def currentViews: ElemStream[CompositeViewDef] =
log.currentStates(Scope.Root).map(toCompositeViewDef)

/**
* Return the indexing views in a non-ending stream
*/
def views(start: Offset): ElemStream[CompositeViewDef] =
log.states(Scope.Root, start).map(toCompositeViewDef)

private def toCompositeViewDef(envelope: Envelope[CompositeViewState]) =
envelope.toElem { v => Some(v.project) }.map { v =>
private def toCompositeViewDef(elem: Elem.SuccessElem[CompositeViewState]) =
elem.withProject(elem.value.project).mapValue { v =>
CompositeViewDef(v)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.indexing.{ElasticSear
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.RemoteContextResolution
import ch.epfl.bluebrain.nexus.delta.sdk.IndexingAction
import ch.epfl.bluebrain.nexus.delta.sourcing.config.BatchConfig
import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ElemStream, ProjectRef, Tag}
import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ElemStream, ProjectRef, SuccessElemStream, Tag}
import ch.epfl.bluebrain.nexus.delta.sourcing.state.GraphResource
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Operation.Sink
import ch.epfl.bluebrain.nexus.delta.sourcing.stream._
Expand All @@ -28,7 +28,7 @@ import scala.concurrent.duration.FiniteDuration
* a maximum duration for the indexing
*/
final class ElasticSearchIndexingAction(
fetchCurrentViews: ProjectRef => ElemStream[IndexingViewDef],
fetchCurrentViews: ProjectRef => SuccessElemStream[IndexingViewDef],
compilePipeChain: PipeChain => Either[ProjectionErr, Operation],
sink: ActiveViewDef => Sink,
override val timeout: FiniteDuration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Subject
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Tag.UserTag
import ch.epfl.bluebrain.nexus.delta.sourcing.model._
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem
import io.circe.Json

import java.util.UUID
Expand Down Expand Up @@ -367,35 +368,35 @@ final class ElasticSearchViews private (
/**
* Return the existing indexing views in a project in a finite stream
*/
def currentIndexingViews(project: ProjectRef): ElemStream[IndexingViewDef] =
def currentIndexingViews(project: ProjectRef): SuccessElemStream[IndexingViewDef] =
log
.currentStates(Scope.Project(project))
.evalMapFilter { envelope =>
IO.pure(toIndexViewDef(envelope))
.evalMapFilter { elem =>
IO.pure(toIndexViewDef(elem))
}

/**
* Return all existing indexing views in a finite stream
*/
def currentIndexingViews: ElemStream[IndexingViewDef] =
def currentIndexingViews: SuccessElemStream[IndexingViewDef] =
log
.currentStates(Scope.Root)
.evalMapFilter { envelope =>
IO.pure(toIndexViewDef(envelope))
.evalMapFilter { elem =>
IO.pure(toIndexViewDef(elem))
}

/**
* Return the indexing views in a non-ending stream
*/
def indexingViews(start: Offset): ElemStream[IndexingViewDef] =
def indexingViews(start: Offset): SuccessElemStream[IndexingViewDef] =
log
.states(Scope.Root, start)
.evalMapFilter { envelope =>
IO.pure(toIndexViewDef(envelope))
.evalMapFilter { elem =>
IO.pure(toIndexViewDef(elem))
}

private def toIndexViewDef(envelope: Envelope[ElasticSearchViewState]) =
envelope.toElem { v => Some(v.project) }.traverse { v =>
private def toIndexViewDef(elem: Elem.SuccessElem[ElasticSearchViewState]) =
elem.withProject(elem.value.project).traverse { v =>
IndexingViewDef(v, defaultElasticsearchMapping, defaultElasticsearchSettings, prefix)
}

Expand Down
Loading

0 comments on commit 783ec49

Please sign in to comment.