diff --git a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/OffsetStoreDao.scala b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/OffsetStoreDao.scala index 14f037868..e2f0c92b7 100644 --- a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/OffsetStoreDao.scala +++ b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/OffsetStoreDao.scala @@ -22,7 +22,7 @@ import akka.projection.r2dbc.internal.R2dbcOffsetStore.LatestBySlice @InternalApi private[projection] trait OffsetStoreDao { - def readTimestampOffset(): Future[immutable.IndexedSeq[R2dbcOffsetStore.RecordWithProjectionKey]] + def readTimestampOffset(slice: Int): Future[immutable.IndexedSeq[R2dbcOffsetStore.RecordWithProjectionKey]] def readTimestampOffset(slice: Int, pid: String): Future[Option[R2dbcOffsetStore.Record]] diff --git a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/PostgresOffsetStoreDao.scala b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/PostgresOffsetStoreDao.scala index 09e5cce51..074dcb8dd 100644 --- a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/PostgresOffsetStoreDao.scala +++ b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/PostgresOffsetStoreDao.scala @@ -64,8 +64,8 @@ private[projection] class PostgresOffsetStoreDao( private val selectTimestampOffsetSql: String = sql""" - SELECT projection_key, slice, persistence_id, seq_nr, timestamp_offset - FROM $timestampOffsetTable WHERE slice BETWEEN ? AND ? AND projection_name = ?""" + SELECT projection_key, persistence_id, seq_nr, timestamp_offset + FROM $timestampOffsetTable WHERE slice = ? AND projection_name = ? ORDER BY timestamp_offset DESC LIMIT ?""" protected def createSelectOneTimestampOffsetSql: String = sql""" @@ -192,25 +192,20 @@ private[projection] class PostgresOffsetStoreDao( s"Expected BySlicesSourceProvider to be defined when TimestampOffset is used.") } - override def readTimestampOffset(): Future[immutable.IndexedSeq[R2dbcOffsetStore.RecordWithProjectionKey]] = { - val (minSlice, maxSlice) = { - sourceProvider match { - case Some(provider) => (provider.minSlice, provider.maxSlice) - case None => (0, persistenceExt.numberOfSlices - 1) - } - } + override def readTimestampOffset( + slice: Int): Future[immutable.IndexedSeq[R2dbcOffsetStore.RecordWithProjectionKey]] = { r2dbcExecutor.select("read timestamp offset")( conn => { logger.trace("reading timestamp offset for [{}]", projectionId) + val limit = 1000 // FIXME config conn .createStatement(selectTimestampOffsetSql) - .bind(0, minSlice) - .bind(1, maxSlice) - .bind(2, projectionId.name) + .bind(0, slice) + .bind(1, projectionId.name) + .bind(2, limit) }, row => { val projectionKey = row.get("projection_key", classOf[String]) - val slice = row.get("slice", classOf[java.lang.Integer]) val pid = row.get("persistence_id", classOf[String]) val seqNr = row.get("seq_nr", classOf[java.lang.Long]) val timestamp = row.getTimestamp("timestamp_offset") diff --git a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcOffsetStore.scala b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcOffsetStore.scala index c92c307fb..7e7c8abed 100644 --- a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcOffsetStore.scala +++ b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcOffsetStore.scala @@ -41,6 +41,9 @@ import scala.collection.immutable.TreeSet import scala.concurrent.ExecutionContext import scala.concurrent.Future +import akka.stream.scaladsl.Sink +import akka.stream.scaladsl.Source + /** * INTERNAL API */ @@ -330,16 +333,29 @@ private[projection] class R2dbcOffsetStore( } private def readTimestampOffset(): Future[Option[TimestampOffset]] = { + implicit val sys = system // for implicit stream materializer triggerDeletionPerSlice.clear() val oldState = state.get() - dao.readTimestampOffset().map { recordsWithKey => + val offsetSliceReadParallelism = 10 // FIXME config + + val recordsWithKeyFut = + Source(minSlice to maxSlice) + .mapAsyncUnordered(offsetSliceReadParallelism) { slice => + dao.readTimestampOffset(slice) + } + .mapConcat(identity) + .runWith(Sink.seq) + .map(_.toVector)(ExecutionContext.parasitic) + + recordsWithKeyFut.map { recordsWithKey => clearInflight() clearForeignOffsets() clearLatestSeen() val newState = { val s = State(recordsWithKey.map(_.record)) + // FIXME shall we evict here, or how does that impact the logic for moreThanOneProjectionKey and foreignOffsets? (minSlice to maxSlice).foldLeft(s) { case (acc, slice) => acc.evict(slice, settings.timeWindow) }