Skip to content

Commit

Permalink
Simplify NewCompositeSink
Browse files Browse the repository at this point in the history
  • Loading branch information
olivergrabinski committed Jul 25, 2023
1 parent 51bc480 commit d98c0c7
Showing 1 changed file with 9 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.indexing.NewQueryGra
import ch.epfl.bluebrain.nexus.delta.rdf.graph.Graph
import ch.epfl.bluebrain.nexus.delta.sourcing.state.GraphResource
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem.{DroppedElem, FailedElem, SuccessElem}
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Operation.Sink
import fs2.Chunk
import monix.bio.Task
Expand Down Expand Up @@ -39,25 +38,22 @@ final class NewCompositeSink[SinkFormat](
override type In = GraphResource
override def inType: Typeable[GraphResource] = Typeable[GraphResource]

private def query(elements: Chunk[Elem[GraphResource]]): Task[Option[Graph]] = {
val elementsForQuery = elements.mapFilter {
case e: SuccessElem[GraphResource] => Some(e.value)
case _: FailedElem => None
case _: DroppedElem => None
private def query(elements: Chunk[Elem[GraphResource]]): Task[Option[Graph]] =
elements.mapFilter(elem => elem.toOption) match {
case elems if elems.nonEmpty => queryGraph(elems)
case _ => Task.none
}
queryGraph(elementsForQuery)
}

private def transformAndSink(elements: Chunk[Elem[GraphResource]], graph: Graph) =
elements
.traverse {
case e: SuccessElem[GraphResource] =>
e.evalMapFilter(g => transform(g.copy(graph = graph.copy(rootNode = g.id))))
case e: FailedElem => Task.pure(e)
case e: DroppedElem => Task.pure(e)
.traverse { elem =>
elem.evalMapFilter(gr => transform(replaceGraph(gr, graph)))
}
.flatMap(sink)

private def replaceGraph(gr: GraphResource, graph: Graph) =
gr.copy(graph = graph.copy(rootNode = gr.id))

override def apply(elements: Chunk[Elem[GraphResource]]): Task[Chunk[Elem[Unit]]] = {
val graph = query(elements)

Expand Down

0 comments on commit d98c0c7

Please sign in to comment.