Skip to content

Commit

Permalink
Add basic reporting for import batch (#4773)
Browse files Browse the repository at this point in the history
* Add basic reporting for import batch

---------

Co-authored-by: Simon Dumas <[email protected]>
  • Loading branch information
imsdu and Simon Dumas authored Mar 4, 2024
1 parent 4c63d50 commit 28052e7
Show file tree
Hide file tree
Showing 12 changed files with 160 additions and 52 deletions.
11 changes: 5 additions & 6 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -731,13 +731,12 @@ lazy val ship = project
.settings(shared, compilation, servicePackaging, assertJavaVersion, kamonSettings, coverage, release)
.dependsOn(sdk % "compile->compile;test->test", testkit % "test->compile")
.settings(
libraryDependencies ++= Seq(declineEffect),
libraryDependencies ++= Seq(declineEffect),
addCompilerPlugin(betterMonadicFor),
run / fork := true,
buildInfoKeys := Seq[BuildInfoKey](version),
buildInfoPackage := "ch.epfl.bluebrain.nexus.delta.ship",
Docker / packageName := "nexus-ship",
coverageFailOnMinimum := false
run / fork := true,
buildInfoKeys := Seq[BuildInfoKey](version),
buildInfoPackage := "ch.epfl.bluebrain.nexus.delta.ship",
Docker / packageName := "nexus-ship"
)

lazy val cargo = taskKey[(File, String)]("Run Cargo to build 'nexus-fixer'")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ class HttpClientSpec
object HttpClientSpec {
final case class Value(name: String, rev: Int, deprecated: Boolean)

final case class Count(
final private case class Count(
reqGetValue: AtomicInteger,
reqStreamValues: AtomicInteger,
reqClientError: AtomicInteger,
Expand All @@ -145,7 +145,7 @@ object HttpClientSpec {
reqOtherError.set(0)
}
}
object Count {
private object Count {
def apply(
reqGetValue: Int = 0,
reqStreamValues: Int = 0,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package ch.epfl.bluebrain.nexus.ship

import cats.effect.IO
import cats.syntax.all._
import ch.epfl.bluebrain.nexus.delta.kernel.Logger
import ch.epfl.bluebrain.nexus.delta.sourcing.event.Event.ScopedEvent
import ch.epfl.bluebrain.nexus.delta.sourcing.model.EntityType
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
import ch.epfl.bluebrain.nexus.ship.EventProcessor.logger
import ch.epfl.bluebrain.nexus.ship.model.InputEvent
import fs2.Stream
Expand All @@ -19,9 +19,9 @@ trait EventProcessor[Event <: ScopedEvent] {

def decoder: Decoder[Event]

def evaluate(event: Event): IO[Unit]
def evaluate(event: Event): IO[ImportStatus]

def evaluate(event: InputEvent): IO[Unit] =
def evaluate(event: InputEvent): IO[ImportStatus] =
IO.fromEither(decoder.decodeJson(event.value))
.onError(err => logger.error(err)(s"Error while attempting to decode $resourceType at offset ${event.ordering}"))
.flatMap(evaluate)
Expand All @@ -31,28 +31,32 @@ object EventProcessor {

private val logger = Logger[EventProcessor.type]

def run(eventStream: Stream[IO, InputEvent], processors: EventProcessor[_]*): IO[Offset] = {
def run(eventStream: Stream[IO, InputEvent], processors: EventProcessor[_]*): IO[ImportReport] = {
val processorsMap = processors.foldLeft(Map.empty[EntityType, EventProcessor[_]]) { (acc, processor) =>
acc + (processor.resourceType -> processor)
}
eventStream
.evalTap { event =>
.evalScan(ImportReport.start) { case (report, event) =>
processorsMap.get(event.`type`) match {
case Some(processor) =>
processor.evaluate(event).onError { err =>
logger.error(err)(
s"Error while processing event with offset '${event.ordering.value}' with processor '${event.`type`}'."
)
}
case None => logger.warn(s"No processor is provided for '${event.`type`}', skipping...")
processor
.evaluate(event)
.map { status =>
report + (event, status)
}
.onError { err =>
logger.error(err)(
s"Error while processing event with offset '${event.ordering.value}' with processor '${event.`type`}'."
)
}
case None =>
logger.warn(s"No processor is provided for '${event.`type`}', skipping...") >>
IO.pure(report + (event, ImportStatus.Dropped))
}
}
.scan((0, Offset.start)) { case ((count, _), event) => (count + 1, event.ordering) }
.compile
.lastOrError
.flatMap { case (count, offset) =>
logger.info(s"$count events were imported up to offset $offset").as(offset)
}
.flatTap { report => logger.info(report.show) }
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ import java.util.UUID
/**
* In this batch, we should not use any id so it helps prevent it
*/
// $COVERAGE-OFF$
object FailingUUID extends UUIDF {
override def apply(): IO[UUID] =
IO.raiseError(new IllegalStateException("Generation of UUID is now allowed as we don't expect id generation"))
}
// $COVERAGE-ON$
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package ch.epfl.bluebrain.nexus.ship

import cats.Show
import cats.kernel.Monoid
import cats.syntax.all._
import ch.epfl.bluebrain.nexus.delta.sourcing.model.EntityType
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
import ch.epfl.bluebrain.nexus.ship.ImportReport.Count
import ch.epfl.bluebrain.nexus.ship.model.InputEvent

import java.time.Instant

final case class ImportReport(offset: Offset, instant: Instant, progress: Map[EntityType, Count]) {
def +(event: InputEvent, status: ImportStatus): ImportReport = {
val entityType = event.`type`
val newProgress = progress.updatedWith(entityType) {
case Some(count) => Some(count |+| status.asCount)
case None => Some(status.asCount)
}
copy(offset = event.ordering, instant = event.instant, progress = newProgress)
}
}

object ImportReport {

val start: ImportReport = ImportReport(Offset.start, Instant.EPOCH, Map.empty)

final case class Count(success: Long, dropped: Long)

object Count {

implicit val countMonoid: Monoid[Count] = new Monoid[Count] {
override def empty: Count = Count(0L, 0L)

override def combine(x: Count, y: Count): Count = Count(x.success + y.success, x.dropped + y.dropped)
}

}

implicit val showReport: Show[ImportReport] = (report: ImportReport) => {
val header = s"Type\tSuccess\tDropped\n"
val details = report.progress.foldLeft(header) { case (acc, (entityType, count)) =>
acc ++ s"$entityType\t${count.success}\t${count.dropped}\n"
}

val aggregatedCount = report.progress.values.reduceOption(_ |+| _).getOrElse(Count(0L, 0L))
val global =
s"${aggregatedCount.success} events were imported up to offset ${report.offset} (${aggregatedCount.dropped} have been dropped)."
s"$global\n$details"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package ch.epfl.bluebrain.nexus.ship

import ch.epfl.bluebrain.nexus.ship.ImportReport.Count

sealed trait ImportStatus {

def asCount: Count

}

object ImportStatus {

case object Success extends ImportStatus {
override def asCount: Count = Count(1L, 0L)
}

case object Dropped extends ImportStatus {

override def asCount: Count = Count(0L, 1L)

}

}
13 changes: 7 additions & 6 deletions ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -59,16 +59,17 @@ object Main
}
.map(_.as(ExitCode.Success))

private[ship] def run(file: Path, config: Option[Path]): IO[Unit] = {
private[ship] def run(file: Path, config: Option[Path]): IO[ImportReport] = {
val clock = Clock[IO]
val uuidF = UUIDF.random
// Resources may have been created with different configurations so we adopt the lenient one for the import
implicit val jsonLdApi: JsonLdApi = JsonLdJavaApi.lenient
for {
_ <- logger.info(s"Running the import with file $file, config $config and from offset $offset")
config <- ShipConfig.load(config)
_ <- Transactors.init(config.database).use { xas =>
val orgProvider = OrganizationProvider(config.eventLog, config.serviceAccount.value, xas, clock)(uuidF)
report <- Transactors.init(config.database).use { xas =>
val orgProvider =
OrganizationProvider(config.eventLog, config.serviceAccount.value, xas, clock)(uuidF)
val fetchContext = FetchContext(ApiMappings.empty, xas, Quotas.disabled)
val eventLogConfig = config.eventLog
val baseUri = config.baseUri
Expand All @@ -79,10 +80,10 @@ object Main
fetchActiveOrg = FetchActiveOrganization(xas)
projectProcessor <- ProjectProcessor(fetchActiveOrg, eventLogConfig, xas)(baseUri)
resolverProcessor <- ResolverProcessor(fetchContext, eventLogConfig, xas)
_ <- EventProcessor.run(events, projectProcessor, resolverProcessor)
} yield ()
report <- EventProcessor.run(events, projectProcessor, resolverProcessor)
} yield report
}
} yield ()
} yield report
}

private[ship] def showConfig(config: Option[Path]) =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ch.epfl.bluebrain.nexus.ship.error

import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef

// $COVERAGE-OFF$
sealed abstract class ShipError(reason: String) extends Exception { self =>
override def fillInStackTrace(): Throwable = self

Expand All @@ -14,3 +15,4 @@ object ShipError {
extends ShipError(s"'$project' is not allowed during import.")

}
// $COVERAGE-ON$
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Subject
import ch.epfl.bluebrain.nexus.delta.sourcing.model.{EntityType, ProjectRef}
import ch.epfl.bluebrain.nexus.ship.error.ShipError.ProjectDeletionIsNotAllowed
import ch.epfl.bluebrain.nexus.ship.projects.ProjectProcessor.logger
import ch.epfl.bluebrain.nexus.ship.{EventClock, EventProcessor, EventUUIDF}
import ch.epfl.bluebrain.nexus.ship.{EventClock, EventProcessor, EventUUIDF, ImportStatus}
import io.circe.Decoder

final class ProjectProcessor private (projects: Projects, clock: EventClock, uuidF: EventUUIDF)
Expand All @@ -24,15 +24,15 @@ final class ProjectProcessor private (projects: Projects, clock: EventClock, uui

override def decoder: Decoder[ProjectEvent] = ProjectEvent.serializer.codec

override def evaluate(event: ProjectEvent): IO[Unit] = {
override def evaluate(event: ProjectEvent): IO[ImportStatus] = {
for {
_ <- clock.setInstant(event.instant)
_ <- uuidF.setUUID(event.uuid)
_ <- evaluateInternal(event)
} yield ()
_ <- clock.setInstant(event.instant)
_ <- uuidF.setUUID(event.uuid)
result <- evaluateInternal(event)
} yield result
}

private def evaluateInternal(event: ProjectEvent) = {
private def evaluateInternal(event: ProjectEvent): IO[ImportStatus] = {
implicit val s: Subject = event.subject
val projectRef = event.project
val cRev = event.rev - 1
Expand All @@ -50,10 +50,14 @@ final class ProjectProcessor private (projects: Projects, clock: EventClock, uui
case _: ProjectMarkedForDeletion =>
IO.raiseError(ProjectDeletionIsNotAllowed(projectRef))
}
}.recoverWith {
case notFound: NotFound => IO.raiseError(notFound)
case error: ProjectRejection => logger.warn(error)(error.reason)
}.void
}.redeemWith(
{
case notFound: NotFound => IO.raiseError(notFound)
case error: ProjectRejection => logger.warn(error)(error.reason).as(ImportStatus.Dropped)
case other => IO.raiseError(other)
},
_ => IO.pure(ImportStatus.Success)
)
}

object ProjectProcessor {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,22 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.config.EventLogConfig
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Subject
import ch.epfl.bluebrain.nexus.delta.sourcing.model.{EntityType, Identity}
import ch.epfl.bluebrain.nexus.ship.resolvers.ResolverProcessor.logger
import ch.epfl.bluebrain.nexus.ship.{EventClock, EventProcessor, FailingUUID}
import ch.epfl.bluebrain.nexus.ship.{EventClock, EventProcessor, FailingUUID, ImportStatus}
import io.circe.Decoder

class ResolverProcessor private (resolvers: Resolvers, clock: EventClock) extends EventProcessor[ResolverEvent] {
override def resourceType: EntityType = Resolvers.entityType

override def decoder: Decoder[ResolverEvent] = ResolverEvent.serializer.codec

override def evaluate(event: ResolverEvent): IO[Unit] = {
override def evaluate(event: ResolverEvent): IO[ImportStatus] = {
for {
_ <- clock.setInstant(event.instant)
_ <- evaluateInternal(event)
} yield ()
_ <- clock.setInstant(event.instant)
result <- evaluateInternal(event)
} yield result
}

private def evaluateInternal(event: ResolverEvent): IO[Unit] = {
private def evaluateInternal(event: ResolverEvent): IO[ImportStatus] = {
val id = event.id
implicit val s: Subject = event.subject
val projectRef = event.project
Expand All @@ -50,10 +50,14 @@ class ResolverProcessor private (resolvers: Resolvers, clock: EventClock) extend
case _: ResolverDeprecated =>
resolvers.deprecate(id, projectRef, cRev)
}
}.recoverWith {
case a: ResourceAlreadyExists => logger.warn(a)("The resolver already exists")
case i: IncorrectRev => logger.warn(i)("An incorrect revision as been provided")
}.void
}.redeemWith(
{
case a: ResourceAlreadyExists => logger.warn(a)("The resolver already exists").as(ImportStatus.Dropped)
case i: IncorrectRev => logger.warn(i)("An incorrect revision as been provided").as(ImportStatus.Dropped)
case other => IO.raiseError(other)
},
_ => IO.pure(ImportStatus.Success)
)

private def identities(value: ResolverValue) =
value match {
Expand Down
4 changes: 3 additions & 1 deletion ship/src/test/resources/import/import.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,6 @@
{"ordering":4879496,"type":"resolver","org":"public","project":"sscx","id":"https://bluebrain.github.io/nexus/vocabulary/crossProject2","rev":2,"value":{"id": "https://bluebrain.github.io/nexus/vocabulary/crossProject2", "rev": 2, "@type": "ResolverUpdated", "value": {"@type": "CrossProjectValue", "priority": 50, "projects": ["neurosciencegraph/datamodels"], "resourceTypes": [], "identityResolution": {"@type": "UseCurrentCaller"}}, "source": {"@id": "https://bluebrain.github.io/nexus/vocabulary/crossProject2", "@type": ["Resolver", "CrossProject"], "@context": ["https://bluebrain.github.io/nexus/contexts/resolvers.json", "https://bluebrain.github.io/nexus/contexts/metadata.json"], "priority": 50, "projects": ["neurosciencegraph/datamodels"], "resourceTypes": [], "useCurrentCaller": true}, "instant": "2022-11-16T13:42:07.498Z", "project": "public/sscx", "subject": {"@type": "User", "realm": "bbp", "subject": "bob"}},"instant":"2022-11-16T14:42:07.498+01:00"}
{"ordering":5300965,"type":"project","org":"public","project":"sscx","id":"projects/public/sscx","rev":2,"value":{"rev": 2, "base": "https://bbp.epfl.ch/neurosciencegraph/data/", "uuid": "c7d70522-4305-480a-b190-75d757ed9a49", "@type": "ProjectUpdated", "label": "sscx", "vocab": "https://bbp.epfl.ch/ontologies/core/bmo/", "instant": "2023-07-16T18:42:59.530Z", "subject": {"@type": "User", "realm": "bbp", "subject": "alice"}, "apiMappings": {"prov": "http://www.w3.org/ns/prov#", "context": "https://incf.github.io/neuroshapes/contexts/", "schemaorg": "http://schema.org/", "datashapes": "https://neuroshapes.org/dash/", "ontologies": "https://neuroshapes.org/dash/ontology", "taxonomies": "https://neuroshapes.org/dash/taxonomy", "commonshapes": "https://neuroshapes.org/commons/", "provdatashapes": "https://provshapes.org/datashapes/", "provcommonshapes": "https://provshapes.org/commons/"}, "description": "This project contains somatosensorycortex dissemination publication data.", "organizationUuid": "d098a020-508b-4131-a28d-75f73b7f5f0e", "organizationLabel": "public"},"instant":"2023-07-16T20:42:59.53+02:00"}
{"ordering":5318566,"type":"project","org":"public","project":"sscx","id":"projects/public/sscx","rev":3,"value":{"rev": 3, "base": "https://bbp.epfl.ch/nexus/v1/resources/public/sscx/_/", "uuid": "c7d70522-4305-480a-b190-75d757ed9a49", "@type": "ProjectUpdated", "label": "sscx", "vocab": "https://bbp.epfl.ch/ontologies/core/bmo/", "instant": "2023-07-21T13:55:02.463Z", "subject": {"@type": "User", "realm": "bbp", "subject": "alice"}, "apiMappings": {"prov": "http://www.w3.org/ns/prov#", "context": "https://incf.github.io/neuroshapes/contexts/", "schemaorg": "http://schema.org/", "datashapes": "https://neuroshapes.org/dash/", "ontologies": "https://neuroshapes.org/dash/ontology", "taxonomies": "https://neuroshapes.org/dash/taxonomy", "commonshapes": "https://neuroshapes.org/commons/", "provdatashapes": "https://provshapes.org/datashapes/", "provcommonshapes": "https://provshapes.org/commons/"}, "description": "This project contains somatosensorycortex dissemination publication data.", "organizationUuid": "d098a020-508b-4131-a28d-75f73b7f5f0e", "organizationLabel": "public"},"instant":"2023-07-21T15:55:02.463+02:00"}
{"ordering":5418473,"type":"project","org":"public","project":"sscx","id":"projects/public/sscx","rev":4,"value":{"rev": 4, "base": "https://bbp.epfl.ch/data/public/sscx/", "uuid": "c7d70522-4305-480a-b190-75d757ed9a49", "@type": "ProjectUpdated", "label": "sscx", "vocab": "https://bbp.epfl.ch/ontologies/core/bmo/", "instant": "2023-08-22T15:05:13.654Z", "subject": {"@type": "User", "realm": "bbp", "subject": "alice"}, "apiMappings": {"prov": "http://www.w3.org/ns/prov#", "context": "https://incf.github.io/neuroshapes/contexts/", "schemaorg": "http://schema.org/", "datashapes": "https://neuroshapes.org/dash/", "ontologies": "https://neuroshapes.org/dash/ontology", "taxonomies": "https://neuroshapes.org/dash/taxonomy", "commonshapes": "https://neuroshapes.org/commons/", "provdatashapes": "https://provshapes.org/datashapes/", "provcommonshapes": "https://provshapes.org/commons/"}, "description": "This project contains somatosensorycortex dissemination publication data.", "organizationUuid": "d098a020-508b-4131-a28d-75f73b7f5f0e", "organizationLabel": "public"},"instant":"2023-08-22T17:05:13.654+02:00"}
{"ordering":5418473,"type":"project","org":"public","project":"sscx","id":"projects/public/sscx","rev":4,"value":{"rev": 4, "base": "https://bbp.epfl.ch/data/public/sscx/", "uuid": "c7d70522-4305-480a-b190-75d757ed9a49", "@type": "ProjectUpdated", "label": "sscx", "vocab": "https://bbp.epfl.ch/ontologies/core/bmo/", "instant": "2023-08-22T15:05:13.654Z", "subject": {"@type": "User", "realm": "bbp", "subject": "alice"}, "apiMappings": {"prov": "http://www.w3.org/ns/prov#", "context": "https://incf.github.io/neuroshapes/contexts/", "schemaorg": "http://schema.org/", "datashapes": "https://neuroshapes.org/dash/", "ontologies": "https://neuroshapes.org/dash/ontology", "taxonomies": "https://neuroshapes.org/dash/taxonomy", "commonshapes": "https://neuroshapes.org/commons/", "provdatashapes": "https://provshapes.org/datashapes/", "provcommonshapes": "https://provshapes.org/commons/"}, "description": "This project contains somatosensorycortex dissemination publication data.", "organizationUuid": "d098a020-508b-4131-a28d-75f73b7f5f0e", "organizationLabel": "public"},"instant":"2023-08-22T17:05:13.654+02:00"}
{"ordering":9999998,"type":"project","org":"public","project":"sscx","id":"projects/public/sscx","rev":5,"value":{"rev": 5, "uuid": "c7d70522-4305-480a-b190-75d757ed9a49", "@type": "ProjectDeprecated", "label": "sscx", "instant": "2099-12-30T23:59:59.999+01:00", "subject": {"@type": "User", "realm": "bbp", "subject": "bob"}, "organizationUuid": "d098a020-508b-4131-a28d-75f73b7f5f0e", "organizationLabel": "public"},"instant":"2099-12-30T23:59:59.999+01:00"}
{"ordering":9999999,"type":"xxx", "org":"public","project":"sscx","id":"https://bluebrain.github.io/nexus/vocabulary/xxx","rev":1,"value":{},"instant":"2099-12-31T23:59:59.999+01:00"}
Loading

0 comments on commit 28052e7

Please sign in to comment.