Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Filter by type in the SQL query for Composite Views #4231

Merged
merged 6 commits into from
Aug 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.model.BaseUri
import ch.epfl.bluebrain.nexus.delta.sdk.permissions.Permissions.events
import ch.epfl.bluebrain.nexus.delta.sdk.sse.SseElemStream
import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Tag.UserTag
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Tag.{Latest, UserTag}
import ch.epfl.bluebrain.nexus.delta.sourcing.query.SelectFilter
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.RemainingElems
import io.circe.syntax.EncoderOps
Expand Down Expand Up @@ -54,30 +54,32 @@ class ElemRoutes(
pathPrefix("elems") {
resolveProjectRef { project =>
authorizeFor(project, events.read).apply {
concat(
(get & pathPrefix("continuous") & parameter("tag".as[UserTag].?)) { tag =>
operationName(s"$prefixSegment/$project/elems/continuous") {
emit(sseElemStream.continuous(project, SelectFilter.tagOrLatest(tag), offset))
(parameter("tag".as[UserTag].?) & types(project)) { (tag, types) =>
concat(
(get & pathPrefix("continuous")) {
operationName(s"$prefixSegment/$project/elems/continuous") {
emit(sseElemStream.continuous(project, SelectFilter(types, tag.getOrElse(Latest)), offset))
}
},
(get & pathPrefix("currents")) {
operationName(s"$prefixSegment/$project/elems/currents") {
emit(sseElemStream.currents(project, SelectFilter(types, tag.getOrElse(Latest)), offset))
}
},
(get & pathPrefix("remaining")) {
operationName(s"$prefixSegment/$project/elems/remaining") {
emit(
sseElemStream.remaining(project, SelectFilter(types, tag.getOrElse(Latest)), offset).map {
r => r.getOrElse(RemainingElems(0L, Instant.EPOCH))
}
)
}
},
head {
complete(OK)
}
},
(get & pathPrefix("currents") & parameter("tag".as[UserTag].?)) { tag =>
operationName(s"$prefixSegment/$project/elems/currents") {
emit(sseElemStream.currents(project, SelectFilter.tagOrLatest(tag), offset))
}
},
(get & pathPrefix("remaining") & parameter("tag".as[UserTag].?)) { tag =>
operationName(s"$prefixSegment/$project/elems/remaining") {
emit(
sseElemStream.remaining(project, SelectFilter.tagOrLatest(tag), offset).map { r =>
r.getOrElse(RemainingElems(0L, Instant.EPOCH))
}
)
}
},
head {
complete(OK)
}
)
)
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,14 @@ object DeltaClient {
}
}

private def typeQuery(types: Set[Iri]) =
if (types.isEmpty) Query.Empty
else Query(types.map(t => "type" -> t.toString).toList: _*)

private def elemAddress(source: RemoteProjectSource) =
source.endpoint / "elems" / source.project.organization.value / source.project.project.value
(source.endpoint / "elems" / source.project.organization.value / source.project.project.value)
.withQuery(Query("tag" -> source.selectFilter.tag.toString))
.withQuery(typeQuery(source.selectFilter.types))

override def resourceAsNQuads(source: RemoteProjectSource, id: Iri): HttpResult[Option[NQuads]] = {
implicit val cred: Option[AuthToken] = token(source)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.decoder.semiauto.deriveDefaultJs
import ch.epfl.bluebrain.nexus.delta.sdk.model.BaseUri
import ch.epfl.bluebrain.nexus.delta.sdk.instances._
import ch.epfl.bluebrain.nexus.delta.sourcing.model.{Identity, ProjectRef}
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Tag.UserTag
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Tag.{Latest, UserTag}
import ch.epfl.bluebrain.nexus.delta.sourcing.query.SelectFilter
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.PipeChain
import io.circe.{Encoder, Json}

Expand Down Expand Up @@ -54,6 +55,13 @@ sealed trait CompositeViewSource extends Product with Serializable {
*/
def resourceTag: Option[UserTag]

/**
* @return
* the [[SelectFilter]] for the given view; used to filter the data that is indexed
*/
def selectFilter: SelectFilter =
SelectFilter(resourceTypes, resourceTag.getOrElse(Latest))

/**
* @return
* whether to consider deprecated resources for indexing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.model.CompositeViewS
import ch.epfl.bluebrain.nexus.delta.sdk.stream.GraphResourceStream
import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ElemPipe, ProjectRef}
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
import ch.epfl.bluebrain.nexus.delta.sourcing.query.SelectFilter
import ch.epfl.bluebrain.nexus.delta.sourcing.state.GraphResource
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.{RemainingElems, Source}
import io.circe.Json
Expand Down Expand Up @@ -56,27 +55,27 @@ object CompositeGraphStream {
override def main(source: CompositeViewSource, project: ProjectRef): Source = {
source match {
case p: ProjectSource =>
Source(local.continuous(project, SelectFilter.tagOrLatest(p.resourceTag), _).through(drainSource))
Source(local.continuous(project, p.selectFilter, _).through(drainSource))
case c: CrossProjectSource =>
Source(local.continuous(c.project, SelectFilter.tagOrLatest(c.resourceTag), _).through(drainSource))
Source(local.continuous(c.project, c.selectFilter, _).through(drainSource))
case r: RemoteProjectSource => remote.main(r)
}
}

override def rebuild(source: CompositeViewSource, project: ProjectRef): Source = {
source match {
case p: ProjectSource =>
Source(local.currents(project, SelectFilter.tagOrLatest(p.resourceTag), _).through(drainSource))
Source(local.currents(project, p.selectFilter, _).through(drainSource))
case c: CrossProjectSource =>
Source(local.currents(c.project, SelectFilter.tagOrLatest(c.resourceTag), _).through(drainSource))
Source(local.currents(c.project, c.selectFilter, _).through(drainSource))
case r: RemoteProjectSource => remote.rebuild(r)
}
}

override def remaining(source: CompositeViewSource, project: ProjectRef): Offset => UIO[Option[RemainingElems]] =
source match {
case p: ProjectSource => local.remaining(project, SelectFilter.tagOrLatest(p.resourceTag), _)
case c: CrossProjectSource => local.remaining(c.project, SelectFilter.tagOrLatest(c.resourceTag), _)
case p: ProjectSource => local.remaining(project, p.selectFilter, _)
case c: CrossProjectSource => local.remaining(c.project, c.selectFilter, _)
case r: RemoteProjectSource => remote.remaining(r, _).map(Some(_))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,4 @@ object SelectFilter {
/** All types with latest tag */
val latest: SelectFilter = SelectFilter(Set.empty, Tag.Latest)

/** All types with specified tag if it exists, otherwise latest */
val tagOrLatest: Option[Tag] => SelectFilter =
tag => SelectFilter(Set.empty, tag.getOrElse(Tag.Latest))

}
11 changes: 11 additions & 0 deletions docs/src/main/paradox/docs/releases/v1.9-release-notes.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,21 @@ It is now possible to aggregate resources by `@type` or `project`.

### Views

#### Indexing errors listing

Indexing errors can now be listed and filtered for a given view.

@ref:[More information](../delta/api/views/index.md#listing-indexing-failures)

#### Resource type filtering performance improvement

To improve indexing performance, the types defined in the
@ref:[FilterByType pipe](../delta/api/views/pipes.md#filter-by-type),
@ref:[Sparql View payload](../delta/api/views/sparql-view-api.md#payload), or the
@ref:[Composite View source payload](../delta/api/views/composite-view-api.md#sources)
are filtered in PostgreSQL rather than in Nexus Delta.
This avoids querying for data just to discard it straight away.

### Composite views

To enhance performance of indexing of composite views, Nexus Delta introduces the following features.
Expand Down