Skip to content

Commit

Permalink
Add caching to lookupSchemasUntil function
Browse files Browse the repository at this point in the history
Also, lookupSchemasUntilResult function is implemented as well. It is the variant of lookupSchemasUntil
that returns ResolverResult which contains information about cahe item.

Also, mimaPreviousVersions is cleared out because this commit breaks binary compatibility with previous versions
since it adds a new function to a trait. Therefore, we needed to clear-out the mimaPreviousVersions.
  • Loading branch information
spenes committed Sep 27, 2024
1 parent 2dc120a commit 0037573
Show file tree
Hide file tree
Showing 8 changed files with 486 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ trait CreateResolverCache[F[_]] {

def createSchemaListCache(size: Int): F[LruMap[F, ListCacheKey, ListCacheEntry]]

def createSchemaContentListCache(size: Int): F[LruMap[F, SchemaKey, SchemaContentListCacheEntry]]

def createMutex[K]: F[ResolverMutex[F, K]]

}
Expand All @@ -43,6 +45,10 @@ object CreateResolverCache {
override def createSchemaListCache(size: Int): F[LruMap[F, ListCacheKey, ListCacheEntry]] =
createLruMap(size)

override def createSchemaContentListCache(
size: Int
): F[LruMap[F, SchemaKey, SchemaContentListCacheEntry]] =
createLruMap(size)
}

implicit def idCreateResolverCache: CreateResolverCache[Id] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,19 @@ final case class Resolver[F[_]](repos: List[Registry], cache: Option[ResolverCac
)) && custom.values.flatMap(_.errors).forall(_ == RegistryError.NotFound)
}

/**
* The variant of lookupSchemasUntilResult that returns the result
* that isn't wrapped with ResolverResult
*/
def lookupSchemasUntil(
maxSchemaKey: SchemaKey
)(implicit
F: Monad[F],
L: RegistryLookup[F],
C: Clock[F]
): F[Either[SchemaResolutionError, NonEmptyList[SelfDescribingSchema[Json]]]] =
lookupSchemasUntilResult(maxSchemaKey).map(_.map(_.value))

/**
* Looks up all the schemas with the same model until `maxSchemaKey`.
* For the schemas of previous revisions, it starts with addition = 0
Expand All @@ -209,49 +222,95 @@ final case class Resolver[F[_]](repos: List[Registry], cache: Option[ResolverCac
* @return All the schemas if all went well, [[Resolver.SchemaResolutionError]] with the first error that happened
* while looking up the schemas if something went wrong.
*/
def lookupSchemasUntil(
def lookupSchemasUntilResult(
maxSchemaKey: SchemaKey
)(implicit
F: Monad[F],
L: RegistryLookup[F],
C: Clock[F]
): F[Either[SchemaResolutionError, NonEmptyList[SelfDescribingSchema[Json]]]] = {
def go(
current: SchemaVer.Full,
acc: List[SelfDescribingSchema[Json]]
): F[Either[SchemaResolutionError, NonEmptyList[SelfDescribingSchema[Json]]]] = {
val currentSchemaKey = maxSchemaKey.copy(version = current)
lookupSchema(currentSchemaKey).flatMap {
case Left(e) =>
if (current.addition === 0)
Monad[F].pure(Left(SchemaResolutionError(currentSchemaKey, e)))
else if (current.revision < maxSchemaKey.version.revision && isNotFound(e))
go(current.copy(revision = current.revision + 1, addition = 0), acc)
else
Monad[F].pure(Left(SchemaResolutionError(currentSchemaKey, e)))
case Right(json) =>
if (current.revision < maxSchemaKey.version.revision)
go(
current.copy(addition = current.addition + 1),
SelfDescribingSchema(SchemaMap(currentSchemaKey), json) :: acc
)
else if (current.addition < maxSchemaKey.version.addition)
go(
current.copy(addition = current.addition + 1),
SelfDescribingSchema(SchemaMap(currentSchemaKey), json) :: acc
)
else
Monad[F].pure(
Right(
NonEmptyList(SelfDescribingSchema(SchemaMap(currentSchemaKey), json), acc).reverse
): F[Either[SchemaResolutionError, SchemaContentListLookupResult]] = {
def get(): F[Either[SchemaResolutionError, SchemaContentList]] = {
def go(
current: SchemaVer.Full,
acc: List[SelfDescribingSchema[Json]]
): F[Either[SchemaResolutionError, NonEmptyList[SelfDescribingSchema[Json]]]] = {
val currentSchemaKey = maxSchemaKey.copy(version = current)
lookupSchema(currentSchemaKey).flatMap {
case Left(e) =>
if (current.addition === 0)
Monad[F].pure(Left(SchemaResolutionError(currentSchemaKey, e)))
else if (current.revision < maxSchemaKey.version.revision && isNotFound(e))
go(current.copy(revision = current.revision + 1, addition = 0), acc)
else
Monad[F].pure(Left(SchemaResolutionError(currentSchemaKey, e)))
case Right(json) =>
if (current.revision < maxSchemaKey.version.revision)
go(
current.copy(addition = current.addition + 1),
SelfDescribingSchema(SchemaMap(currentSchemaKey), json) :: acc
)
)
else if (current.addition < maxSchemaKey.version.addition)
go(
current.copy(addition = current.addition + 1),
SelfDescribingSchema(SchemaMap(currentSchemaKey), json) :: acc
)
else
Monad[F].pure(
Right(
NonEmptyList(SelfDescribingSchema(SchemaMap(currentSchemaKey), json), acc).reverse
)
)
}
}

go(SchemaVer.Full(maxSchemaKey.version.model, 0, 0), Nil)
}

go(SchemaVer.Full(maxSchemaKey.version.model, 0, 0), Nil)
def handleAfterFetch(
result: Either[SchemaResolutionError, SchemaContentList]
): F[Either[SchemaResolutionError, SchemaContentListLookupResult]] =
cache match {
case Some(c) =>
val updated = result.leftMap(e => resolutionErrorToFailureMap(e))
c.putSchemaContentListResult(maxSchemaKey, updated).map {
case Right(ResolverCache.TimestampedItem(i, t)) =>
Right(ResolverResult.Cached(maxSchemaKey, i, t))
case Left(failure) =>
val schemaKey = result.leftMap(_.schemaKey).left.getOrElse(maxSchemaKey)
Left(SchemaResolutionError(schemaKey, resolutionError(failure)))
}
case None =>
result
.map[SchemaContentListLookupResult](ResolverResult.NotCached(_))
.pure[F]
}

def lockAndLookup: F[Either[SchemaResolutionError, SchemaContentListLookupResult]] =
withLockOnSchemaContentList(maxSchemaKey) {
getSchemaContentListFromCache(maxSchemaKey).flatMap {
case Some(TimestampedItem(Right(i), t)) =>
Monad[F].pure(Right(ResolverResult.Cached(maxSchemaKey, i, t)))
case Some(TimestampedItem(Left(_), _)) | None =>
for {
result <- get()
fixed <- handleAfterFetch(result)
} yield fixed
}
}

getSchemaContentListFromCache(maxSchemaKey).flatMap {
case Some(TimestampedItem(Right(i), t)) =>
Monad[F].pure(Right(ResolverResult.Cached(maxSchemaKey, i, t)))
case Some(TimestampedItem(Left(_), _)) | None =>
lockAndLookup
}
}

def resolutionErrorToFailureMap(resolutionError: SchemaResolutionError): LookupFailureMap =
resolutionError.error.value.toMap.flatMap { case (key, value) =>
allRepos.find(_.config.name == key).map((_, value))
}

/**
* Get list of available schemas for particular vendor and name part
* Server supposed to return them in proper order
Expand Down Expand Up @@ -428,6 +487,12 @@ final case class Resolver[F[_]](repos: List[Registry], cache: Option[ResolverCac
case None => f
}

private def withLockOnSchemaContentList[A](schemaKey: SchemaKey)(f: => F[A]): F[A] =
cache match {
case Some(c) => c.withLockOnSchemaContentList(schemaKey)(f)
case None => f
}

private def getSchemaListFromCache(
vendor: Vendor,
name: Name,
Expand All @@ -441,15 +506,26 @@ final case class Resolver[F[_]](repos: List[Registry], cache: Option[ResolverCac
case None => Monad[F].pure(None)
}

private def getSchemaContentListFromCache(
schemaKey: SchemaKey
)(implicit
F: Monad[F],
C: Clock[F]
): F[Option[ResolverCache.TimestampedItem[SchemaContentListLookup]]] =
cache match {
case Some(c) => c.getTimestampedSchemaContentList(schemaKey)
case None => Monad[F].pure(None)
}
}

/** Companion object. Lets us create a Resolver from a Json */
object Resolver {

type SchemaListKey = (Vendor, Name, Model)
type SchemaLookupResult = ResolverResult[SchemaKey, SchemaItem]
type SchemaListLookupResult = ResolverResult[SchemaListKey, SchemaList]
type SupersededBy = Option[SchemaVer.Full]
type SchemaListKey = (Vendor, Name, Model)
type SchemaLookupResult = ResolverResult[SchemaKey, SchemaItem]
type SchemaListLookupResult = ResolverResult[SchemaListKey, SchemaList]
type SchemaContentListLookupResult = ResolverResult[SchemaKey, SchemaContentList]
type SupersededBy = Option[SchemaVer.Full]

/**
* The result of doing schema lookup
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,10 @@ import Resolver.SchemaItem
class ResolverCache[F[_]] private (
schemas: LruMap[F, SchemaKey, SchemaCacheEntry],
schemaLists: LruMap[F, ListCacheKey, ListCacheEntry],
schemaContentLists: LruMap[F, SchemaKey, SchemaContentListCacheEntry],
schemaMutex: ResolverMutex[F, SchemaKey],
schemaListMutex: ResolverMutex[F, ListCacheKey],
schemaContentListMutex: ResolverMutex[F, SchemaKey],
val ttl: Option[TTL]
) {

Expand Down Expand Up @@ -144,6 +146,23 @@ class ResolverCache[F[_]] private (
f: => F[A]
): F[A] =
schemaListMutex.withLockOn((vendor, name, model))(f)

private[resolver] def getTimestampedSchemaContentList(
schemaKey: SchemaKey
)(implicit F: Monad[F], C: Clock[F]): F[Option[TimestampedItem[SchemaContentListLookup]]] =
getTimestampedItem(ttl, schemaContentLists, schemaKey)

private[resolver] def putSchemaContentListResult(
schemaKey: SchemaKey,
schemas: SchemaContentListLookup
)(implicit
F: Monad[F],
C: Clock[F]
): F[Either[LookupFailureMap, TimestampedItem[SchemaContentList]]] =
putItemResult(schemaContentLists, schemaKey, schemas)

private[resolver] def withLockOnSchemaContentList[A](key: SchemaKey)(f: => F[A]): F[A] =
schemaContentListMutex.withLockOn(key)(f)
}

object ResolverCache {
Expand All @@ -159,11 +178,21 @@ object ResolverCache {
): F[Option[ResolverCache[F]]] = {
if (shouldCreateResolverCache(size, ttl)) {
for {
schemas <- C.createSchemaCache(size)
schemaLists <- C.createSchemaListCache(size)
schemaMutex <- C.createMutex[SchemaKey]
listMutex <- C.createMutex[ListCacheKey]
} yield new ResolverCache[F](schemas, schemaLists, schemaMutex, listMutex, ttl).some
schemas <- C.createSchemaCache(size)
schemaLists <- C.createSchemaListCache(size)
schemaContentLists <- C.createSchemaContentListCache(size)
schemaMutex <- C.createMutex[SchemaKey]
listMutex <- C.createMutex[ListCacheKey]
schemaContentListMutex <- C.createMutex[SchemaKey]
} yield new ResolverCache[F](
schemas,
schemaLists,
schemaContentLists,
schemaMutex,
listMutex,
schemaContentListMutex,
ttl
).some
} else
Applicative[F].pure(none)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,12 @@ package com.snowplowanalytics.iglu.client

import scala.concurrent.duration.FiniteDuration

import cats.data.NonEmptyList

import io.circe.Json

// Iglu Core
import com.snowplowanalytics.iglu.core.SchemaList
import com.snowplowanalytics.iglu.core.{SchemaList, SelfDescribingSchema}

// This project
import resolver.registries.Registry
Expand All @@ -32,6 +36,8 @@ package object resolver {
/** Schema's model */
type Model = Int

type SchemaContentList = NonEmptyList[SelfDescribingSchema[Json]]

/**
* Map of all repositories to its aggregated state of failure
* None as value means repository already responded with `not-found`,
Expand All @@ -53,6 +59,13 @@ package object resolver {
*/
type ListLookup = Either[LookupFailureMap, SchemaList]

/**
* Validated schema content list lookup result containing, cache result
* which is list of self describing schemas in case of success or
* Map of all currently failed repositories in case of failure
*/
type SchemaContentListLookup = Either[LookupFailureMap, SchemaContentList]

/** Time to live for cached items */
type TTL = FiniteDuration

Expand All @@ -77,4 +90,7 @@ package object resolver {
/** Cache entry for schema list lookup results */
type ListCacheEntry = CacheEntry[ListLookup]

/** Cache entry for schema content list lookup results */
type SchemaContentListCacheEntry = CacheEntry[SchemaContentListLookup]

}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ class ResolverCacheSpec extends Specification {
4.millis,
List((key, (2.millis, Right(SchemaItem(Json.Null, None))))),
5,
List()
List(),
Map.empty
)

val test = for {
Expand Down
Loading

0 comments on commit 0037573

Please sign in to comment.