Skip to content

Commit

Permalink
readTimestampOffset
Browse files Browse the repository at this point in the history
* read from each slice, desc timestamp and limit
  • Loading branch information
patriknw committed Nov 20, 2024
1 parent 6d3621e commit 3cfb8e4
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import akka.projection.r2dbc.internal.R2dbcOffsetStore.LatestBySlice
@InternalApi
private[projection] trait OffsetStoreDao {

def readTimestampOffset(): Future[immutable.IndexedSeq[R2dbcOffsetStore.RecordWithProjectionKey]]
def readTimestampOffset(slice: Int): Future[immutable.IndexedSeq[R2dbcOffsetStore.RecordWithProjectionKey]]

def readTimestampOffset(slice: Int, pid: String): Future[Option[R2dbcOffsetStore.Record]]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ private[projection] class PostgresOffsetStoreDao(

private val selectTimestampOffsetSql: String =
sql"""
SELECT projection_key, slice, persistence_id, seq_nr, timestamp_offset
FROM $timestampOffsetTable WHERE slice BETWEEN ? AND ? AND projection_name = ?"""
SELECT projection_key, persistence_id, seq_nr, timestamp_offset
FROM $timestampOffsetTable WHERE slice = ? AND projection_name = ? ORDER BY timestamp_offset DESC LIMIT ?"""

protected def createSelectOneTimestampOffsetSql: String =
sql"""
Expand Down Expand Up @@ -192,25 +192,20 @@ private[projection] class PostgresOffsetStoreDao(
s"Expected BySlicesSourceProvider to be defined when TimestampOffset is used.")
}

override def readTimestampOffset(): Future[immutable.IndexedSeq[R2dbcOffsetStore.RecordWithProjectionKey]] = {
val (minSlice, maxSlice) = {
sourceProvider match {
case Some(provider) => (provider.minSlice, provider.maxSlice)
case None => (0, persistenceExt.numberOfSlices - 1)
}
}
override def readTimestampOffset(
slice: Int): Future[immutable.IndexedSeq[R2dbcOffsetStore.RecordWithProjectionKey]] = {
r2dbcExecutor.select("read timestamp offset")(
conn => {
logger.trace("reading timestamp offset for [{}]", projectionId)
val limit = 1000 // FIXME config
conn
.createStatement(selectTimestampOffsetSql)
.bind(0, minSlice)
.bind(1, maxSlice)
.bind(2, projectionId.name)
.bind(0, slice)
.bind(1, projectionId.name)
.bind(2, limit)
},
row => {
val projectionKey = row.get("projection_key", classOf[String])
val slice = row.get("slice", classOf[java.lang.Integer])
val pid = row.get("persistence_id", classOf[String])
val seqNr = row.get("seq_nr", classOf[java.lang.Long])
val timestamp = row.getTimestamp("timestamp_offset")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ import scala.collection.immutable.TreeSet
import scala.concurrent.ExecutionContext
import scala.concurrent.Future

import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source

/**
* INTERNAL API
*/
Expand Down Expand Up @@ -330,16 +333,29 @@ private[projection] class R2dbcOffsetStore(
}

private def readTimestampOffset(): Future[Option[TimestampOffset]] = {
implicit val sys = system // for implicit stream materializer
triggerDeletionPerSlice.clear()
val oldState = state.get()

dao.readTimestampOffset().map { recordsWithKey =>
val offsetSliceReadParallelism = 10 // FIXME config

val recordsWithKeyFut =
Source(minSlice to maxSlice)
.mapAsyncUnordered(offsetSliceReadParallelism) { slice =>
dao.readTimestampOffset(slice)
}
.mapConcat(identity)
.runWith(Sink.seq)
.map(_.toVector)(ExecutionContext.parasitic)

recordsWithKeyFut.map { recordsWithKey =>
clearInflight()
clearForeignOffsets()
clearLatestSeen()

val newState = {
val s = State(recordsWithKey.map(_.record))
// FIXME shall we evict here, or how does that impact the logic for moreThanOneProjectionKey and foreignOffsets?
(minSlice to maxSlice).foldLeft(s) {
case (acc, slice) => acc.evict(slice, settings.timeWindow)
}
Expand Down

0 comments on commit 3cfb8e4

Please sign in to comment.