Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Elasticsearch default views resetter #4780

Merged
merged 3 commits into from
Mar 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.model.{defaultViewId,
import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri
import ch.epfl.bluebrain.nexus.delta.sdk.projects.Projects
import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors
import ch.epfl.bluebrain.nexus.delta.sourcing.implicits._
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Subject
import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef
import doobie.implicits._
Expand Down Expand Up @@ -92,15 +93,15 @@ object ElasticSearchDefaultViewsResetter {
override def resetDefaultViews: IO[Unit] =
resetTrigger.flatMap { triggered =>
IO.whenA(triggered) {
views.compile.toList
.flatMap { _.traverse { resetView } }
.flatMap { _ => logger.info("Completed resetting default elasticsearch views.") }
deleteDatabaseEntries >>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So was the fact that ES indices were deleted before the entries also an issue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is a combination of that and the fact that we were reading from replicas when recreating the views yes.
Reducing the number of transactions is not a bad thing anyway, it will also help the WAL to get replicated.

views.compile.toList
.flatMap { _.traverse { resetView } }
.flatMap { _ => logger.info("Completed resetting default elasticsearch views.") }
}
}

override def resetView(view: ViewElement): IO[Unit] =
deleteEsIndex(view) >>
deleteEventsStatesOffsets(view.project).transact(xas.write) >>
createDefaultView(view.project)

private def deleteEsIndex(view: ViewElement) =
Expand All @@ -109,12 +110,12 @@ object ElasticSearchDefaultViewsResetter {
case _: DeprecatedViewDef => IO.pure(true)
}.void

private def deleteEventsStatesOffsets(project: ProjectRef): doobie.ConnectionIO[Unit] =
private def deleteDatabaseEntries: IO[Unit] =
sql"""
DELETE FROM scoped_events WHERE type = 'elasticsearch' AND id = ${defaultViewId.toString} AND org = ${project.organization} AND project = ${project.project};
DELETE FROM scoped_states WHERE type = 'elasticsearch' AND id = ${defaultViewId.toString} AND org = ${project.organization} AND project = ${project.project};
DELETE FROM projection_offsets WHERE module = 'elasticsearch' AND resource_id = ${defaultViewId.toString} AND project = $project;
""".stripMargin.update.run.void
DELETE FROM scoped_events WHERE type = 'elasticsearch' AND id = $defaultViewId;
DELETE FROM scoped_states WHERE type = 'elasticsearch' AND id = $defaultViewId;
DELETE FROM projection_offsets WHERE module = 'elasticsearch' AND resource_id = $defaultViewId;
""".stripMargin.update.run.void.transact(xas.write)

private def createDefaultView(project: ProjectRef): IO[Unit] =
createView(defaultViewId, project, newViewValue)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri
import ch.epfl.bluebrain.nexus.delta.sourcing.implicits._
import ch.epfl.bluebrain.nexus.delta.rdf.syntax.iriStringContextSyntax
import ch.epfl.bluebrain.nexus.delta.sdk.views.{IndexingRev, ViewRef}
import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef
import ch.epfl.bluebrain.nexus.delta.sourcing.model.{Label, ProjectRef}
import ch.epfl.bluebrain.nexus.delta.sourcing.postgres.Doobie
import ch.epfl.bluebrain.nexus.delta.sourcing.query.SelectFilter
import ch.epfl.bluebrain.nexus.testkit.mu.NexusSuite
Expand Down Expand Up @@ -46,59 +46,15 @@ class ElasticSearchDefaultViewsResetterSuite
)

private val customViewId = iri"https://other.id"
private val customView = defaultView.copy(ref = ViewRef(project, iri"$customViewId"))

private val existingDefaultView = ExistingView(defaultView)
private val missingDefaultView = MissingView(project3)
private val existingCustomView = ExistingView(customView)

// TODO: Find how to move this to beforeAll
test("Setup: partitions should be created") {
initPartitions(xas, project, project2)
}

test("The resetter should delete scoped events for an existing default view") {
insertViewEvent(defaultViewId, project) >>
assertIO(eventsCount, 1) >>
resetWithNoViewCreation.resetView(existingDefaultView) >>
assertIO(eventsCount, 0)
}

test("The resetter should delete scoped states for an existing default view") {
insertViewState(defaultViewId, project) >>
assertIO(statesCount, 1) >>
resetWithNoViewCreation.resetView(existingDefaultView) >>
assertIO(statesCount, 0)
}

test("The resetter should delete projection offsets for an existing default view") {
insertViewProjectionOffset(defaultViewId, project) >>
assertIO(projectionOffsetCount, 1) >>
resetWithNoViewCreation.resetView(existingDefaultView) >>
assertIO(projectionOffsetCount, 0)
}

test("The resetter should not delete a scoped event for an existing custom view") {
insertViewEvent(customViewId, project) >>
assertIO(eventsCount, 1) >>
resetWithNoViewCreation.resetView(existingCustomView) >>
assertIO(eventsCount, 1)
}

test("The resetter should not delete a scoped state for an existing custom view") {
insertViewState(customViewId, project) >>
assertIO(statesCount, 1) >>
resetWithNoViewCreation.resetView(existingCustomView) >>
assertIO(statesCount, 1)
}

test("The resetter should not delete projection offsets for a custom view") {
insertViewProjectionOffset(customViewId, project) >>
assertIO(projectionOffsetCount, 1) >>
resetWithNoViewCreation.resetView(existingCustomView) >>
assertIO(projectionOffsetCount, 1)
}

test("The resetter should create a new view") {
for {
createdViewRef <- Ref.of[IO, Set[ViewRef]](Set.empty[ViewRef])
Expand All @@ -119,11 +75,18 @@ class ElasticSearchDefaultViewsResetterSuite
for {
createdViewRef <- Ref.of[IO, Set[ViewRef]](Set.empty[ViewRef])
_ <- clearDB
_ <- insertViewEvent(defaultViewId, project)
_ <- insertViewEvent(defaultViewId, project2)
_ <- assertIO(eventsCount, 2)
_ <- insertView(defaultViewId, project)
_ <- insertView(defaultViewId, project2)
_ <- insertView(customViewId, project)
expectedViews = Set(project -> defaultViewId, project2 -> defaultViewId, project -> customViewId)
_ <- assertIO(eventsId, expectedViews)
_ <- assertIO(statesId, expectedViews)
_ <- assertIO(projectionOffsetIds, expectedViews)
_ <- resetWithViewCreation(createdViewRef).resetDefaultViews
_ <- assertIO(eventsCount, 0)
remainingCustom = Set(project -> customViewId)
_ <- assertIO(eventsId, remainingCustom)
_ <- assertIO(statesId, remainingCustom)
_ <- assertIO(projectionOffsetIds, remainingCustom)
expectedCreatedViews = Set(project, project2, project3).map(ViewRef(_, defaultViewId))
_ <- assertIO(createdViewRef.get, expectedCreatedViews)
} yield ()
Expand All @@ -146,15 +109,6 @@ class ElasticSearchDefaultViewsResetterSuite
private val viewElem3 = missingDefaultView
private val viewStream = Stream(viewElem1, viewElem2, viewElem3)

private def resetWithNoViewCreation = ElasticSearchDefaultViewsResetter(
viewStream,
_ => IO(true),
(_, _, _) => IO.unit,
defaultViewValue,
IO(true),
xas
)

private def resetWithViewCreation(created: Ref[IO, Set[ViewRef]]) =
ElasticSearchDefaultViewsResetter(
viewStream,
Expand All @@ -175,6 +129,9 @@ class ElasticSearchDefaultViewsResetterSuite
xas
)

private def insertView(id: Iri, projectRef: ProjectRef) =
insertViewEvent(id, projectRef) >> insertViewState(id, projectRef) >> insertViewProjectionOffset(id, projectRef)

private def insertViewEvent(id: Iri, projectRef: ProjectRef) =
sql"""
INSERT INTO scoped_events (type, org, project, id, rev, value, instant)
Expand All @@ -187,33 +144,37 @@ class ElasticSearchDefaultViewsResetterSuite
VALUES ('elasticsearch', ${projectRef.organization}, ${projectRef.project}, $id, 'tag', 5, '{"nb": 1}', false, CURRENT_TIMESTAMP);
""".stripMargin.update.run.transact(xas.write)

private def insertViewProjectionOffset(id: Iri, projectRef: ProjectRef) =
private def insertViewProjectionOffset(id: Iri, projectRef: ProjectRef) = {
val projectionName = s"${projectRef}_$id"
sql"""
INSERT INTO projection_offsets (name, module, project, resource_id, ordering, processed, discarded, failed, created_at, updated_at)
VALUES ('default', 'elasticsearch', $projectRef, $id, 123, 2, 1, 0, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP);
VALUES ($projectionName, 'elasticsearch', $projectRef, $id, 123, 2, 1, 0, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP);
""".stripMargin.update.run.transact(xas.write)
}

private def clearDB =
sql"""
DELETE FROM scoped_events; DELETE FROM scoped_states; DELETE FROM projection_offsets;
""".stripMargin.update.run.transact(xas.write)

private def eventsCount =
sql"""SELECT count(*) FROM scoped_events;""".stripMargin
.query[Int]
.unique
private def eventsId: IO[Set[(ProjectRef, Iri)]] =
sql"""SELECT org, project, id FROM scoped_events;""".stripMargin
.query[(Label, Label, Iri)]
.map { case (org, proj, id) => ProjectRef(org, proj) -> id }
.to[Set]
.transact(xas.read)

private def statesCount =
sql"""SELECT count(*) FROM scoped_states;""".stripMargin
.query[Int]
.unique
private def statesId: IO[Set[(ProjectRef, Iri)]] =
sql"""SELECT org, project, id FROM scoped_states;""".stripMargin
.query[(Label, Label, Iri)]
.map { case (org, proj, id) => ProjectRef(org, proj) -> id }
.to[Set]
.transact(xas.read)

private def projectionOffsetCount =
sql"""SELECT count(*) FROM projection_offsets;""".stripMargin
.query[Int]
.unique
private def projectionOffsetIds: IO[Set[(ProjectRef, Iri)]] =
sql"""SELECT project, resource_id FROM projection_offsets;""".stripMargin
.query[(ProjectRef, Iri)]
.to[Set]
.transact(xas.read)

}
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ object GlobalEventLog {
}

override def evaluate(id: Id, command: Command): IO[(E, S)] =
stateStore.get(id).flatMap { current =>
stateStore.getWrite(id).flatMap { current =>
stateMachine
.evaluate(current, command, maxDuration)
.flatTap { case (event, state) =>
Expand All @@ -148,7 +148,7 @@ object GlobalEventLog {
}

override def dryRun(id: Id, command: Command): IO[(E, S)] =
stateStore.get(id).flatMap { current =>
stateStore.getWrite(id).flatMap { current =>
stateMachine.evaluate(current, command, maxDuration)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ object ScopedEventLog {
}

for {
originalState <- stateStore.get(ref, id).redeem(_ => None, Some(_))
originalState <- stateStore.getWrite(ref, id).redeem(_ => None, Some(_))
result <- stateMachine.evaluate(originalState, command, maxDuration)
_ <- persist(result._1, originalState, result._2)
} yield result
Expand All @@ -298,7 +298,7 @@ object ScopedEventLog {
sql.getSQLState == sqlstate.class23.UNIQUE_VIOLATION.value

override def dryRun(ref: ProjectRef, id: Id, command: Command): IO[(E, S)] =
stateStore.get(ref, id).redeem(_ => None, Some(_)).flatMap { state =>
stateStore.getWrite(ref, id).redeem(_ => None, Some(_)).flatMap { state =>
stateMachine.evaluate(state, command, maxDuration)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ trait GlobalStateStore[Id, S <: GlobalState] {
*/
def delete(id: Id): ConnectionIO[Unit]

/**
* Returns the latest state from the write nodes to get a stronger consistency when the Postgres works in a
* replicated fashion
*/
def getWrite(id: Id): IO[Option[S]]

/**
* Returns the state
*/
Expand Down Expand Up @@ -102,7 +108,11 @@ object GlobalStateStore {
override def delete(id: Id): ConnectionIO[Unit] =
sql"""DELETE FROM global_states WHERE type = $tpe AND id = $id""".stripMargin.update.run.void

override def get(id: Id): IO[Option[S]] = GlobalStateGet[Id, S](tpe, id).transact(xas.read)
override def getWrite(id: Id): IO[Option[S]] = get(id, xas.write)

override def get(id: Id): IO[Option[S]] = get(id, xas.read)

private def get(id: Id, xa: Transactor[IO]) = GlobalStateGet[Id, S](tpe, id).transact(xa)

private def states(offset: Offset, strategy: RefreshStrategy): Stream[IO, S] =
StreamingQuery[(S, Long)](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ trait ScopedStateStore[Id, S <: ScopedState] {
*/
def delete(ref: ProjectRef, id: Id, tag: Tag): ConnectionIO[Unit]

/**
* Returns the latest state from the write nodes to get a stronger consistency when the Postgres works in a
* replicated fashion
*/
def getWrite(ref: ProjectRef, id: Id): IO[S]

/**
* Returns the latest state
*/
Expand Down Expand Up @@ -208,8 +214,12 @@ object ScopedStateStore {
.option
.map(_.isDefined)

override def get(ref: ProjectRef, id: Id): IO[S] =
ScopedStateGet.latest[Id, S](tpe, ref, id).transact(xas.read).flatMap { s =>
override def getWrite(ref: ProjectRef, id: Id): IO[S] = get(ref, id, xas.write)

override def get(ref: ProjectRef, id: Id): IO[S] = get(ref, id, xas.read)

private def get(ref: ProjectRef, id: Id, xa: Transactor[IO]) =
ScopedStateGet.latest[Id, S](tpe, ref, id).transact(xa).flatMap { s =>
IO.fromOption(s)(UnknownState)
}

Expand Down