diff --git a/delta/app/src/main/resources/app.conf b/delta/app/src/main/resources/app.conf index 381fbfd66a..c76d59b3d7 100644 --- a/delta/app/src/main/resources/app.conf +++ b/delta/app/src/main/resources/app.conf @@ -49,6 +49,8 @@ app { # Database export configuration export { batch-size = 30 + # Limit number of events per files (this default value should give ~1GB files) + limit-per-file = 32000 # Max number of concurrent exports permits = 1 # Target directory for exports diff --git a/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/wiring/ExportModule.scala b/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/wiring/ExportModule.scala index 0af4dc0e07..938d38d16b 100644 --- a/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/wiring/ExportModule.scala +++ b/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/wiring/ExportModule.scala @@ -1,6 +1,5 @@ package ch.epfl.bluebrain.nexus.delta.wiring -import cats.effect.{Clock, IO} import ch.epfl.bluebrain.nexus.delta.Main.pluginsMaxPriority import ch.epfl.bluebrain.nexus.delta.config.AppConfig import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.RemoteContextResolution @@ -16,8 +15,8 @@ import izumi.distage.model.definition.{Id, ModuleDef} // $COVERAGE-OFF$ object ExportModule extends ModuleDef { - make[Exporter].fromEffect { (config: AppConfig, clock: Clock[IO], xas: Transactors) => - Exporter(config.`export`, clock, xas) + make[Exporter].fromEffect { (config: AppConfig, xas: Transactors) => + Exporter(config.`export`, xas) } make[ExportRoutes].from { diff --git a/delta/app/src/test/scala/ch/epfl/bluebrain/nexus/delta/routes/ExportRoutesSpec.scala b/delta/app/src/test/scala/ch/epfl/bluebrain/nexus/delta/routes/ExportRoutesSpec.scala index e56b715912..9e401dcdf8 100644 --- a/delta/app/src/test/scala/ch/epfl/bluebrain/nexus/delta/routes/ExportRoutesSpec.scala +++ b/delta/app/src/test/scala/ch/epfl/bluebrain/nexus/delta/routes/ExportRoutesSpec.scala @@ -15,8 +15,6 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.exporter.{ExportEventQuery, Export import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.{Anonymous, Authenticated, Group} import fs2.io.file.Path -import java.time.Instant - class ExportRoutesSpec extends BaseRouteSpec { private val caller = Caller(alice, Set(alice, Anonymous, Authenticated(realm), Group("group", realm))) @@ -31,7 +29,7 @@ class ExportRoutesSpec extends BaseRouteSpec { private val exporter = new Exporter { override def events(query: ExportEventQuery): IO[ExportResult] = - exportTrigger.set(true).as(ExportResult(Path("json"), Path("Success"), Instant.EPOCH, Instant.EPOCH)) + exportTrigger.set(true).as(ExportResult(Path("target"), Path("success"))) } private lazy val routes = Route.seal( diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/exporter/ExportConfig.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/exporter/ExportConfig.scala index f8c0177865..d7978a9c58 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/exporter/ExportConfig.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/exporter/ExportConfig.scala @@ -5,7 +5,7 @@ import pureconfig.ConfigConvert.catchReadError import pureconfig.generic.semiauto.deriveReader import pureconfig.{ConfigConvert, ConfigReader} -final case class ExportConfig(batchSize: Int, permits: Int, target: Path) +final case class ExportConfig(batchSize: Int, limitPerFile: Int, permits: Int, target: Path) object ExportConfig { diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/exporter/Exporter.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/exporter/Exporter.scala index 3079de81d3..3191463803 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/exporter/Exporter.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/exporter/Exporter.scala @@ -1,7 +1,6 @@ package ch.epfl.bluebrain.nexus.delta.sourcing.exporter import cats.effect.IO -import cats.effect.kernel.Clock import cats.effect.std.Semaphore import ch.epfl.bluebrain.nexus.delta.kernel.Logger import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors @@ -10,15 +9,14 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.exporter.Exporter.ExportResult import ch.epfl.bluebrain.nexus.delta.sourcing.implicits._ import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import ch.epfl.bluebrain.nexus.delta.sourcing.query.{RefreshStrategy, StreamingQuery} +import ch.epfl.bluebrain.nexus.delta.sourcing.stream.utils.StreamingUtils import doobie.Fragments import doobie.implicits._ import doobie.util.query.Query0 import fs2.Stream -import fs2.io.file.{Files, Path} +import fs2.io.file._ import io.circe.syntax.EncoderOps -import java.time.Instant - trait Exporter { def events(query: ExportEventQuery): IO[ExportResult] @@ -29,15 +27,16 @@ object Exporter { private val logger = Logger[Exporter] - final case class ExportResult(json: Path, success: Path, start: Instant, end: Instant) + private val fileFormat = "%09d" + + final case class ExportResult(targetDirectory: Path, success: Path) - def apply(config: ExportConfig, clock: Clock[IO], xas: Transactors): IO[Exporter] = - Semaphore[IO](config.permits.toLong).map(new ExporterImpl(config, _, clock, xas)) + def apply(config: ExportConfig, xas: Transactors): IO[Exporter] = + Semaphore[IO](config.permits.toLong).map(new ExporterImpl(config, _, xas)) - private class ExporterImpl(config: ExportConfig, semaphore: Semaphore[IO], clock: Clock[IO], xas: Transactors) - extends Exporter { + private class ExporterImpl(config: ExportConfig, semaphore: Semaphore[IO], xas: Transactors) extends Exporter { - val queryConfig = QueryConfig(config.batchSize, RefreshStrategy.Stop) + private val queryConfig = QueryConfig(config.batchSize, RefreshStrategy.Stop) override def events(query: ExportEventQuery): IO[ExportResult] = { val projectFilter = Fragments.orOpt( query.projects.map { project => sql"(org = ${project.organization} and project = ${project.project})" } @@ -51,35 +50,43 @@ object Exporter { |""".stripMargin.query[RowEvent] val exportIO = for { - start <- clock.realTimeInstant _ <- logger.info(s"Starting export for projects ${query.projects} from offset ${query.offset}") targetDirectory = config.target / query.output.value _ <- Files[IO].createDirectory(targetDirectory) - exportFile = targetDirectory / s"$start.json" - _ <- exportToFile(q, query.offset, exportFile) - end <- clock.realTimeInstant - exportSuccess = targetDirectory / s"$start.success" + exportDuration <- exportToFile(q, query.offset, targetDirectory) + exportSuccess = targetDirectory / s"${paddedOffset(query.offset)}.success" _ <- writeSuccessFile(query, exportSuccess) _ <- logger.info( - s"Export for projects ${query.projects} from offset' ${query.offset}' after ${end.getEpochSecond - start.getEpochSecond} seconds." + s"Export for projects ${query.projects} from offset' ${query.offset.value}' after ${exportDuration.toSeconds} seconds." ) - } yield ExportResult(exportFile, exportSuccess, start, end) + } yield ExportResult(targetDirectory, exportSuccess) semaphore.permit.use { _ => exportIO } } - private def exportToFile(query: Offset => Query0[RowEvent], start: Offset, targetFile: Path) = { - StreamingQuery[RowEvent](start, query, _.ordering, queryConfig, xas) - .map(_.asJson.noSpaces) - .intersperse("\n") - .through(Files[IO].writeUtf8(targetFile)) + private def exportToFile(query: Offset => Query0[RowEvent], start: Offset, targetDirectory: Path) = + Stream + .eval(IO.ref(start)) + .flatMap { offsetRef => + def computePath = offsetRef.get.map { o => + targetDirectory / s"${paddedOffset(o)}.json" + } + + StreamingQuery[RowEvent](start, query, _.ordering, queryConfig, xas) + .evalTap { rowEvent => offsetRef.set(rowEvent.ordering) } + .map(_.asJson.noSpaces) + .through(StreamingUtils.writeRotate(computePath, config.limitPerFile)) + } .compile .drain - } + .timed + .map(_._1) private def writeSuccessFile(query: ExportEventQuery, targetFile: Path) = Stream(query.asJson.toString()).through(Files[IO].writeUtf8(targetFile)).compile.drain + + private def paddedOffset(offset: Offset) = fileFormat.format(offset.value) } } diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/utils/StreamingUtils.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/utils/StreamingUtils.scala new file mode 100644 index 0000000000..967b1686c0 --- /dev/null +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/utils/StreamingUtils.scala @@ -0,0 +1,75 @@ +package ch.epfl.bluebrain.nexus.delta.sourcing.stream.utils + +import cats.effect.{IO, Resource} +import cats.effect.std.Hotswap +import fs2.io.file.{FileHandle, Files, Flag, Flags, Path, WriteCursor} +import fs2.{text, Pipe, Pull, Stream} + +object StreamingUtils { + + private val flags = Flags.Write + + private val lineSeparator = "\n" + + private val newLine = Stream.emit(lineSeparator) + + def readLines(path: Path) = + Files[IO].readUtf8Lines(path).filter(_.nonEmpty) + + /** + * Writes all data to a sequence of files, each limited to a maximum number of lines + * + * Adapted from fs2.io.file.Files.writeRotate (which is not preserving lines) + * + * @param computePath + * to compute the path of the first file and the subsequent ones + * @param limit + * maximum number of lines + */ + def writeRotate(computePath: IO[Path], limit: Int): Pipe[IO, String, Nothing] = { + def openNewFile: Resource[IO, FileHandle[IO]] = + Resource + .eval(computePath) + .flatMap(p => Files[IO].open(p, flags.addIfAbsent(Flag.Write))) + + def newCursor(file: FileHandle[IO]): IO[WriteCursor[IO]] = + Files[IO].writeCursorFromFileHandle(file, flags.contains(Flag.Append)) + + def go( + fileHotswap: Hotswap[IO, FileHandle[IO]], + cursor: WriteCursor[IO], + acc: Int, + s: Stream[IO, String] + ): Pull[IO, Unit, Unit] = { + s.pull.unconsLimit(limit - acc).flatMap { + case Some((hd, tl)) => + val newAcc = acc + hd.size + val hdAsBytes = + Stream.chunk(hd).intersperse(lineSeparator).append(newLine).through(text.utf8.encode) + cursor.writeAll(hdAsBytes).flatMap { nc => + if (newAcc >= limit) + Pull + .eval { + fileHotswap + .swap(openNewFile) + .flatMap(newCursor) + } + .flatMap(nc => go(fileHotswap, nc, 0, tl)) + else + go(fileHotswap, nc, newAcc, tl) + } + case None => Pull.done + } + } + + in => + Stream + .resource(Hotswap(openNewFile)) + .flatMap { case (fileHotswap, fileHandle) => + Stream.eval(newCursor(fileHandle)).flatMap { cursor => + go(fileHotswap, cursor, 0, in).stream.drain + } + } + } + +} diff --git a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/exporter/ExporterSuite.scala b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/exporter/ExporterSuite.scala index c202564f16..8bc4c720a4 100644 --- a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/exporter/ExporterSuite.scala +++ b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/exporter/ExporterSuite.scala @@ -10,6 +10,7 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.{Anonymous, User} import ch.epfl.bluebrain.nexus.delta.sourcing.model.{Label, ProjectRef} import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import ch.epfl.bluebrain.nexus.delta.sourcing.postgres.Doobie +import ch.epfl.bluebrain.nexus.delta.sourcing.stream.utils.StreamingUtils import ch.epfl.bluebrain.nexus.testkit.clock.FixedClock import ch.epfl.bluebrain.nexus.testkit.file.TempDirectory import ch.epfl.bluebrain.nexus.testkit.mu.NexusSuite @@ -23,11 +24,11 @@ class ExporterSuite extends NexusSuite with Doobie.Fixture with TempDirectory.Fi private lazy val doobieFixture = doobieInject( PullRequest.eventStore(_, event1, event2, event3, event4, event5, event6), - Exporter(exporterConfig, clock, _) + Exporter(exporterConfig, _) ) override def munitFixtures: Seq[AnyFixture[_]] = List(tempDirectory, doobieFixture) - private lazy val exporterConfig = ExportConfig(5, 3, exportDirectory) + private lazy val exporterConfig = ExportConfig(5, 4, 3, exportDirectory) private lazy val (_, _, exporter) = doobieFixture() private lazy val exportDirectory = tempDirectory() @@ -55,13 +56,21 @@ class ExporterSuite extends NexusSuite with Doobie.Fixture with TempDirectory.Fi private def orderingValue(obj: JsonObject) = obj("ordering").flatMap(_.asNumber.flatMap(_.toInt)) - private def readJsonLines(path: Path) = Files[IO] - .readUtf8Lines(path) - .evalMap { line => - IO.fromEither(parseAsObject(line)) - } - .compile - .toList + private def readDataFiles(path: Path): IO[(Int, List[JsonObject])] = { + def readDataFile(path: Path) = + StreamingUtils + .readLines(path) + .evalMap { line => + IO.fromEither(parseAsObject(line)) + } + .compile + .toList + + for { + dataFiles <- Files[IO].list(path).filter(_.extName.equals(".json")).compile.toList + jsonEvents <- dataFiles.sortBy(_.fileName.toString).flatTraverse(readDataFile) + } yield (dataFiles.size, jsonEvents) + } private def readSuccess(path: Path) = Files[IO] .readUtf8(path) @@ -71,21 +80,24 @@ class ExporterSuite extends NexusSuite with Doobie.Fixture with TempDirectory.Fi .compile .lastOrError - private def assertExport(result: Exporter.ExportResult, query: ExportEventQuery, expectedOrdering: List[Int])(implicit - location: Location - ) = - for { - exportContent <- readJsonLines(result.json) - orderingValues = exportContent.mapFilter(orderingValue) - _ = assertEquals(orderingValues, expectedOrdering) - _ <- readSuccess(result.success).assertEquals(query) - } yield () + private def assertExport( + result: Exporter.ExportResult, + query: ExportEventQuery, + expectedFileCount: Int, + expectedOrdering: List[Int] + )(implicit location: Location) = + readDataFiles(result.targetDirectory).map { case (fileCount, exportContent) => + assertEquals(fileCount, expectedFileCount) + val orderingValues = exportContent.mapFilter(orderingValue) + assertEquals(orderingValues, expectedOrdering) + } >> + readSuccess(result.success).assertEquals(query) test(s"Export all events for $project1 and $project3") { val query = ExportEventQuery(Label.unsafe("export1"), NonEmptyList.of(project1, project3), Offset.start) for { result <- exporter.events(query) - _ <- assertExport(result, query, List(1, 2, 3, 4, 6)) + _ <- assertExport(result, query, 2, List(1, 2, 3, 4, 6)) } yield () } @@ -93,7 +105,7 @@ class ExporterSuite extends NexusSuite with Doobie.Fixture with TempDirectory.Fi val query = ExportEventQuery(Label.unsafe("export2"), NonEmptyList.of(project1, project3), Offset.at(2L)) for { result <- exporter.events(query) - _ <- assertExport(result, query, List(3, 4, 6)) + _ <- assertExport(result, query, 1, List(3, 4, 6)) } yield () } } diff --git a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/utils/StreamingUtilsSuite.scala b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/utils/StreamingUtilsSuite.scala new file mode 100644 index 0000000000..d069b79ef1 --- /dev/null +++ b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/utils/StreamingUtilsSuite.scala @@ -0,0 +1,34 @@ +package ch.epfl.bluebrain.nexus.delta.sourcing.stream.utils + +import cats.effect.IO +import ch.epfl.bluebrain.nexus.testkit.file.TempDirectory +import ch.epfl.bluebrain.nexus.testkit.mu.NexusSuite +import fs2.Stream +import fs2.io.file.Files +import munit.AnyFixture + +class StreamingUtilsSuite extends NexusSuite with TempDirectory.Fixture { + + override def munitFixtures: Seq[AnyFixture[_]] = List(tempDirectory) + + private lazy val exportDirectory = tempDirectory() + + private val limitPerFile = 3 + private val lines = Stream.emits(List("A", "B", "C", "D", "E")) + + test(s"Write stream of lines in a file rotating every $limitPerFile lines") { + for { + refCompute <- IO.ref(0) + computePath = refCompute + .updateAndGet(_ + 1) + .map { counter => exportDirectory / s"part-$counter.txt" } + _ <- lines.through(StreamingUtils.writeRotate(computePath, limitPerFile)).compile.drain + _ <- Files[IO].list(exportDirectory).assertSize(2) + firstFile = exportDirectory / "part-1.txt" + _ <- Files[IO].readUtf8Lines(firstFile).assert("A", "B", "C") + secondFile = exportDirectory / "part-2.txt" + _ <- Files[IO].readUtf8Lines(secondFile).assert("D", "E") + } yield () + } + +} diff --git a/delta/testkit/src/main/scala/ch/epfl/bluebrain/nexus/testkit/file/TempDirectory.scala b/delta/testkit/src/main/scala/ch/epfl/bluebrain/nexus/testkit/file/TempDirectory.scala index ed78fa59cf..a815e3d5b1 100644 --- a/delta/testkit/src/main/scala/ch/epfl/bluebrain/nexus/testkit/file/TempDirectory.scala +++ b/delta/testkit/src/main/scala/ch/epfl/bluebrain/nexus/testkit/file/TempDirectory.scala @@ -12,7 +12,7 @@ object TempDirectory { val tempDirectory: IOFixture[Path] = ResourceSuiteLocalFixture( "tempDirectory", - Resource.make(Files[IO].createTempDirectory)(Files[IO].deleteRecursively) + Resource.make(Files[IO].createTempDirectory)(_ => IO.unit) ) } diff --git a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/EventStreamer.scala b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/EventStreamer.scala index 5e678a713b..69f42cbce3 100644 --- a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/EventStreamer.scala +++ b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/EventStreamer.scala @@ -6,6 +6,7 @@ import ch.epfl.bluebrain.nexus.delta.kernel.Logger import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient import ch.epfl.bluebrain.nexus.delta.sourcing.exporter.RowEvent import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset +import ch.epfl.bluebrain.nexus.delta.sourcing.stream.utils.StreamingUtils import ch.epfl.bluebrain.nexus.ship.EventStreamer.logger import fs2.io.file.{Files, Path} import fs2.{text, Stream} @@ -57,6 +58,7 @@ object EventStreamer { .readFileMultipart(bucket, path.toString) .through(text.utf8.decode) .through(text.lines) + .filter(_.nonEmpty) } yield lines override def fileList(path: Path): IO[List[Path]] = @@ -73,7 +75,7 @@ object EventStreamer { def localStreamer: EventStreamer = new EventStreamer { override def streamLines(path: Path): Stream[IO, String] = - Files[IO].readUtf8Lines(path) + StreamingUtils.readLines(path) override def fileList(path: Path): IO[List[Path]] = Files[IO].list(path).compile.toList