Skip to content

Commit

Permalink
Add endpoint to list indexing errors for Blazegraph (#4111)
Browse files Browse the repository at this point in the history
Co-authored-by: Simon Dumas <[email protected]>
  • Loading branch information
imsdu and Simon Dumas authored Jul 27, 2023
1 parent d5fe848 commit ad6b3ec
Show file tree
Hide file tree
Showing 24 changed files with 843 additions and 566 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.config.BlazegraphViewsCo
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.indexing.BlazegraphCoordinator
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.model.BlazegraphViewRejection.ProjectContextRejection
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.model.{contexts, schema => viewsSchemaId, BlazegraphView, BlazegraphViewEvent}
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.routes.BlazegraphViewsRoutes
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.routes.{BlazegraphViewsIndexingRoutes, BlazegraphViewsRoutes, BlazegraphViewsRoutesHandler}
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.slowqueries.{BlazegraphSlowQueryDeleter, BlazegraphSlowQueryLogger, BlazegraphSlowQueryStore}
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.JsonLdApi
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.{ContextValue, RemoteContextResolution}
Expand Down Expand Up @@ -178,8 +178,6 @@ class BlazegraphPluginModule(priority: Int) extends ModuleDef {
identities: Identities,
aclCheck: AclCheck,
views: BlazegraphViews,
projections: Projections,
projectionErrors: ProjectionErrors,
viewsQuery: BlazegraphViewsQuery,
schemeDirectives: DeltaSchemeDirectives,
indexingAction: IndexingAction @Id("aggregate"),
Expand All @@ -196,8 +194,6 @@ class BlazegraphPluginModule(priority: Int) extends ModuleDef {
viewsQuery,
identities,
aclCheck,
projections,
projectionErrors,
schemeDirectives,
indexingAction(_, _, _)(shift, cr)
)(
Expand All @@ -210,6 +206,36 @@ class BlazegraphPluginModule(priority: Int) extends ModuleDef {
)
}

make[BlazegraphViewsIndexingRoutes].from {
(
identities: Identities,
aclCheck: AclCheck,
views: BlazegraphViews,
projections: Projections,
projectionErrors: ProjectionErrors,
schemeDirectives: DeltaSchemeDirectives,
baseUri: BaseUri,
cfg: BlazegraphViewsConfig,
s: Scheduler,
cr: RemoteContextResolution @Id("aggregate"),
ordering: JsonKeyOrdering
) =>
new BlazegraphViewsIndexingRoutes(
views.fetchIndexingView(_, _),
identities,
aclCheck,
projections,
projectionErrors,
schemeDirectives
)(
baseUri,
s,
cr,
ordering,
cfg.pagination
)
}

make[BlazegraphScopeInitialization].from {
(views: BlazegraphViews, serviceAccount: ServiceAccount, config: BlazegraphViewsConfig) =>
new BlazegraphScopeInitialization(views, serviceAccount, config.defaults)
Expand Down Expand Up @@ -240,8 +266,22 @@ class BlazegraphPluginModule(priority: Int) extends ModuleDef {

many[ApiMappings].add(BlazegraphViews.mappings)

many[PriorityRoute].add { (route: BlazegraphViewsRoutes) =>
PriorityRoute(priority, route.routes, requiresStrictEntity = true)
many[PriorityRoute].add {
(
bg: BlazegraphViewsRoutes,
indexing: BlazegraphViewsIndexingRoutes,
schemeDirectives: DeltaSchemeDirectives,
baseUri: BaseUri
) =>
PriorityRoute(
priority,
BlazegraphViewsRoutesHandler(
schemeDirectives,
bg.routes,
indexing.routes
)(baseUri),
requiresStrictEntity = true
)
}

many[ServiceDependency].add { (client: BlazegraphClient @Id("blazegraph-indexing-client")) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,15 @@ object IndexingViewDef {
namespace: String,
indexingRev: Int,
rev: Int
) extends IndexingViewDef
) extends IndexingViewDef {
def projectionMetadata: ProjectionMetadata =
ProjectionMetadata(
BlazegraphViews.entityType.value,
projection,
Some(ref.project),
Some(ref.viewId)
)
}

/**
* Deprecated view to be cleaned up and removed from the supervisor
Expand Down Expand Up @@ -89,22 +97,14 @@ object IndexingViewDef {
stream: Offset => ElemStream[GraphResource],
sink: Sink
): Task[CompiledProjection] = {
val project = v.ref.project
val id = v.ref.viewId
val metadata = ProjectionMetadata(
BlazegraphViews.entityType.value,
v.projection,
Some(project),
Some(id)
)

val postPipes: Operation = GraphResourceToNTriples

val compiled = for {
pipes <- v.pipeChain.traverse(compilePipeChain)
chain = pipes.fold(NonEmptyChain.one(postPipes))(NonEmptyChain(_, postPipes))
projection <- CompiledProjection.compile(
metadata,
v.projectionMetadata,
ExecutionStrategy.PersistentSingleNode,
Source(stream),
chain,
Expand All @@ -113,7 +113,7 @@ object IndexingViewDef {
} yield projection

Task.fromEither(compiled).tapError { e =>
Task.delay(logger.error(s"View '$project/$id' could not be compiled.", e))
Task.delay(logger.error(s"View '${v.ref}' could not be compiled.", e))
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
package ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.routes

import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.Route
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.indexing.IndexingViewDef.ActiveViewDef
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.model.BlazegraphViewRejection._
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.model._
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.model.permissions.{write => Write}
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.routes.BlazegraphViewsIndexingRoutes.FetchIndexingView
import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.contexts
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.JsonLdContext.keywords
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.{ContextValue, RemoteContextResolution}
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.encoder.JsonLdEncoder
import ch.epfl.bluebrain.nexus.delta.rdf.utils.JsonKeyOrdering
import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclCheck
import ch.epfl.bluebrain.nexus.delta.sdk.circe.CirceUnmarshalling
import ch.epfl.bluebrain.nexus.delta.sdk.directives.{AuthDirectives, DeltaDirectives, DeltaSchemeDirectives}
import ch.epfl.bluebrain.nexus.delta.sdk.identities.Identities
import ch.epfl.bluebrain.nexus.delta.sdk.implicits._
import ch.epfl.bluebrain.nexus.delta.sdk.marshalling.RdfMarshalling
import ch.epfl.bluebrain.nexus.delta.sdk.model.search.SearchResults.searchResultsJsonLdEncoder
import ch.epfl.bluebrain.nexus.delta.sdk.model.search.{PaginationConfig, SearchResults}
import ch.epfl.bluebrain.nexus.delta.sdk.model.{BaseUri, IdSegment}
import ch.epfl.bluebrain.nexus.delta.sourcing.ProgressStatistics
import ch.epfl.bluebrain.nexus.delta.sourcing.model.FailedElemLogRow.FailedElemData
import ch.epfl.bluebrain.nexus.delta.sourcing.model.{FailedElemLogRow, ProjectRef}
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
import ch.epfl.bluebrain.nexus.delta.sourcing.projections.{ProjectionErrors, Projections}
import io.circe.Encoder
import io.circe.generic.semiauto.deriveEncoder
import io.circe.syntax._
import monix.bio.IO
import monix.execution.Scheduler
class BlazegraphViewsIndexingRoutes(
fetch: FetchIndexingView,
identities: Identities,
aclCheck: AclCheck,
projections: Projections,
projectionErrors: ProjectionErrors,
schemeDirectives: DeltaSchemeDirectives
)(implicit
baseUri: BaseUri,
s: Scheduler,
cr: RemoteContextResolution,
ordering: JsonKeyOrdering,
pc: PaginationConfig
) extends AuthDirectives(identities, aclCheck)
with CirceUnmarshalling
with DeltaDirectives
with RdfMarshalling
with BlazegraphViewsDirectives {

import schemeDirectives._

implicit private val viewStatisticEncoder: Encoder.AsObject[ProgressStatistics] =
deriveEncoder[ProgressStatistics].mapJsonObject(_.add(keywords.tpe, "ViewStatistics".asJson))

implicit private val viewStatisticJsonLdEncoder: JsonLdEncoder[ProgressStatistics] =
JsonLdEncoder.computeFromCirce(ContextValue(contexts.statistics))

def routes: Route =
pathPrefix("views") {
extractCaller { implicit caller =>
resolveProjectRef.apply { implicit ref =>
idSegment { id =>
concat(
// Fetch a blazegraph view statistics
(pathPrefix("statistics") & get & pathEndOrSingleSlash) {
authorizeFor(ref, permissions.read).apply {
emit(
fetch(id, ref)
.flatMap(v => projections.statistics(ref, v.resourceTag, v.projection))
.rejectOn[ViewNotFound]
)
}
},
// Fetch balzegraph view indexing failures
(pathPrefix("failures") & get) {
authorizeFor(ref, Write).apply {
concat(
(pathPrefix("sse") & lastEventId) { offset =>
emit(
fetch(id, ref)
.map { view =>
projectionErrors.sses(view.ref.project, view.ref.viewId, offset)
}
)
},
(fromPaginated & timeRange("instant") & extractUri & pathEndOrSingleSlash) {
(pagination, timeRange, uri) =>
implicit val searchJsonLdEncoder: JsonLdEncoder[SearchResults[FailedElemData]] =
searchResultsJsonLdEncoder(FailedElemLogRow.context, pagination, uri)
emit(
fetch(id, ref)
.flatMap { view =>
projectionErrors.search(view.ref, pagination, timeRange)
}
)
}
)
}
},
// Manage an blazegraph view offset
(pathPrefix("offset") & pathEndOrSingleSlash) {
concat(
// Fetch a blazegraph view offset
(get & authorizeFor(ref, permissions.read)) {
emit(
fetch(id, ref)
.flatMap(v => projections.offset(v.projection))
.rejectOn[ViewNotFound]
)
},
// Remove an blazegraph view offset (restart the view)
(delete & authorizeFor(ref, Write)) {
emit(
fetch(id, ref)
.flatMap { r => projections.scheduleRestart(r.projection) }
.as(Offset.start)
.rejectOn[ViewNotFound]
)
}
)
}
)
}
}
}
}
}

object BlazegraphViewsIndexingRoutes {

type FetchIndexingView = (IdSegment, ProjectRef) => IO[BlazegraphViewRejection, ActiveViewDef]

/**
* @return
* the [[Route]] for BlazegraphViews
*/
def apply(
fetch: FetchIndexingView,
identities: Identities,
aclCheck: AclCheck,
projections: Projections,
projectionErrors: ProjectionErrors,
schemeDirectives: DeltaSchemeDirectives
)(implicit
baseUri: BaseUri,
s: Scheduler,
cr: RemoteContextResolution,
ordering: JsonKeyOrdering,
pc: PaginationConfig
): Route = {
new BlazegraphViewsIndexingRoutes(
fetch,
identities,
aclCheck,
projections,
projectionErrors: ProjectionErrors,
schemeDirectives
).routes
}
}
Loading

0 comments on commit ad6b3ec

Please sign in to comment.