Skip to content

Commit

Permalink
Merge branch 'main' into ns-fix-ttl-check
Browse files Browse the repository at this point in the history
  • Loading branch information
Diego Duarte committed Jan 9, 2025
2 parents 152136b + 60ff3a2 commit 30e3a1d
Show file tree
Hide file tree
Showing 10 changed files with 359 additions and 257 deletions.
74 changes: 39 additions & 35 deletions cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,12 @@ import (
"sync"
"time"

"github.com/geniussportsgroup/gateway_cache/v2/models"
"github.com/geniussportsgroup/gateway_cache/v2/reporter"
"github.com/geniussportsgroup/gateway_cache/v3/reporter"
)

// State that a cache entry could have
const (
AVAILABLE models.EntryState = iota
AVAILABLE EntryState = iota
COMPUTING
COMPUTED
FAILED5xx
Expand All @@ -23,7 +22,7 @@ const (
)

const (
Status4xx models.CodeStatus = iota
Status4xx CodeStatus = iota
Status4xxCached
Status5xx
Status5xxCached
Expand All @@ -41,7 +40,7 @@ type CacheEntry[K any] struct {
expirationTime time.Time
prev *CacheEntry[K]
next *CacheEntry[K]
state models.EntryState // AVAILABLE, COMPUTING, etc
state EntryState // AVAILABLE, COMPUTING, etc
err error
}

Expand All @@ -62,10 +61,12 @@ type CacheDriver[K any, T any] struct {
extendedCapacity int
numEntries int
toCompress bool
processor ProcessorI[K, T]
transformer TransformerI[T]
compressor CompressorI
reporter Reporter
// processor ProcessorI[
cacheMissSolver func(K, ...interface{}) (T, *RequestError) //we will leave the pre process logic for this function
toMapKey func(K) (string, error)
transformer TransformerI[T]
compressor CompressorI
reporter Reporter
}

func (cache *CacheDriver[T, K]) MissCount() int {
Expand Down Expand Up @@ -105,7 +106,7 @@ func (cache *CacheDriver[T, K]) TTLForNegative() time.Duration {
// LazyRemove removes the entry with keyVal from the cache. It does not remove the entry immediately, but it marks it as removed.
func (cache *CacheDriver[T, K]) LazyRemove(keyVal T) error {

key, err := cache.processor.ToMapKey(keyVal)
key, err := cache.toMapKey(keyVal)
if err != nil {
return err
}
Expand Down Expand Up @@ -141,7 +142,7 @@ func (cache *CacheDriver[T, K]) LazyRemove(keyVal T) error {

func (cache *CacheDriver[T, K]) Touch(keyVal T) error {

key, err := cache.processor.ToMapKey(keyVal)
key, err := cache.toMapKey(keyVal)
if err != nil {
return err
}
Expand Down Expand Up @@ -200,7 +201,8 @@ func New[K any, T any](
capFactor float64,
ttl time.Duration,
ttlForNegative time.Duration,
processor ProcessorI[K, T],
missSolver func(K, ...interface{}) (T, *RequestError),
toMapKey func(K) (string, error),
options ...Options[K, T],
) *CacheDriver[K, T] {

Expand All @@ -219,7 +221,8 @@ func New[K any, T any](
ttl: ttl,
ttlForNegative: ttlForNegative,
table: make(map[string]*CacheEntry[T], int(extendedCapacity)),
processor: processor,
cacheMissSolver: missSolver,
toMapKey: toMapKey,
compressor: lz4Compressor{},
reporter: &reporter.Default{},
}
Expand Down Expand Up @@ -282,17 +285,18 @@ func New[K any, T any](
// func (_ *DefaultTransformer[T]) ValueToBytes(in T) ([]byte, error) {
// return json.Marshal(in)
// }
func NewWithCompression[T any, K any](
func NewWithCompression[K any, T any](
capacity int,
capFactor float64,
ttl time.Duration,
ttlForNegative time.Duration,
processor ProcessorI[T, K],
compressor TransformerI[K],
options ...Options[T, K],
) (cache *CacheDriver[T, K]) {
missSolver func(K, ...interface{}) (T, *RequestError),
toMapKey func(K) (string, error),
compressor TransformerI[T],
options ...Options[K, T],
) (cache *CacheDriver[K, T]) {

cache = New(capacity, capFactor, ttl, ttlForNegative, processor, options...)
cache = New(capacity, capFactor, ttl, ttlForNegative, missSolver, toMapKey, options...)
if cache != nil {
cache.toCompress = true
cache.transformer = compressor
Expand Down Expand Up @@ -335,7 +339,7 @@ func (cache *CacheDriver[T, K]) isMru(entry *CacheEntry[K]) bool {
}

func (cache *CacheDriver[T, K]) isKeyLru(keyVal T) (bool, error) {
key, err := cache.processor.ToMapKey(keyVal)
key, err := cache.toMapKey(keyVal)
if err != nil {
return false, err
}
Expand All @@ -349,7 +353,7 @@ func (cache *CacheDriver[T, K]) isKeyLru(keyVal T) (bool, error) {

func (cache *CacheDriver[T, K]) isKeyMru(keyVal T) (bool, error) {

key, err := cache.processor.ToMapKey(keyVal)
key, err := cache.toMapKey(keyVal)
if err != nil {
return false, err
}
Expand Down Expand Up @@ -414,15 +418,15 @@ func (cache *CacheDriver[T, K]) allocateEntry(
// ready. If the request is not the first but the result is not still ready, then it blocks
// until the result is ready
func (cache *CacheDriver[T, K]) RetrieveFromCacheOrCompute(request T,
other ...interface{}) (K, *models.RequestError) {
other ...interface{}) (K, *RequestError) {

var requestError *models.RequestError
var requestError *RequestError
var zeroK K
payload := request

cacheKey, err := cache.processor.ToMapKey(payload)
cacheKey, err := cache.toMapKey(payload)
if err != nil {
return zeroK, &models.RequestError{
return zeroK, &RequestError{
Error: err,
Code: Status4xx,
}
Expand All @@ -449,13 +453,13 @@ func (cache *CacheDriver[T, K]) RetrieveFromCacheOrCompute(request T,

if entry.state == FAILED5xx {
// entry.expirationTime = currTime.Add(cache.ttlForNegative)
return zeroK, &models.RequestError{
return zeroK, &RequestError{
Error: entry.err,
Code: Status5xxCached, // include 4xx and 5xx
}
} else if entry.state == FAILED4xx {
// entry.expirationTime = currTime.Add(cache.ttlForNegative)
return zeroK, &models.RequestError{
return zeroK, &RequestError{
Error: entry.err,
Code: Status4xxCached, // include 4xx and 5xx
}
Expand All @@ -468,15 +472,15 @@ func (cache *CacheDriver[T, K]) RetrieveFromCacheOrCompute(request T,

buf, err := cache.compressor.Decompress(entry.postProcessedResponseCompressed)
if err != nil {
return zeroK, &models.RequestError{
return zeroK, &RequestError{
Error: errors.New("cannot decompress stored message"),
Code: Status5xx, // include 4xx and 5xx
}
}

result, err := cache.transformer.BytesToValue(buf)
if err != nil {
return zeroK, &models.RequestError{
return zeroK, &RequestError{
Error: errors.New("cannot convert decompressed stored message"),
Code: Status5xx, // include 4xx and 5xx
}
Expand All @@ -493,7 +497,7 @@ func (cache *CacheDriver[T, K]) RetrieveFromCacheOrCompute(request T,
if err != nil {
cache.lock.Unlock() // an error getting cache entry ==> we invoke directly the uservice
// return cache.callUServices(request, payload, other...)
return cache.processor.CacheMissSolver(request, other...)
return cache.cacheMissSolver(request, other...)
}

entry.state = COMPUTING
Expand All @@ -504,7 +508,7 @@ func (cache *CacheDriver[T, K]) RetrieveFromCacheOrCompute(request T,
entry.lock.Lock() // other requests will wait for until postProcessedResponse is gotten
defer entry.lock.Unlock()

retVal, requestError := cache.processor.CacheMissSolver(request, other...)
retVal, requestError := cache.cacheMissSolver(request, other...)
if requestError != nil {
switch requestError.Code {
case Status4xx, Status4xxCached:
Expand Down Expand Up @@ -551,7 +555,7 @@ func (cache *CacheDriver[T, K]) remove(entry *CacheEntry[K]) {

// has return true is state in the cache
func (cache *CacheDriver[T, K]) has(val T) bool {
key, err := cache.processor.ToMapKey(val)
key, err := cache.toMapKey(val)
// key, err := cache.toMapKey(val)
if err != nil {
return false
Expand Down Expand Up @@ -706,7 +710,7 @@ func (cache *CacheDriver[T, K]) Set(capacity int, ttl time.Duration, ttlForNegat

func (cache *CacheDriver[T, K]) RetrieveValue(keyVal T) (K, error) {
var zeroK K
key, err := cache.processor.ToMapKey(keyVal)
key, err := cache.toMapKey(keyVal)
if err != nil {
return zeroK, err
}
Expand Down Expand Up @@ -749,7 +753,7 @@ func (cache *CacheDriver[T, K]) RetrieveValue(keyVal T) (K, error) {

// Contains returns true if the key is in the cache
func (cache *CacheDriver[T, K]) Contains(keyVal T) (bool, error) {
key, err := cache.processor.ToMapKey(keyVal)
key, err := cache.toMapKey(keyVal)
if err != nil {
return false, err
}
Expand Down Expand Up @@ -781,7 +785,7 @@ func (cache *CacheDriver[T, K]) Contains(keyVal T) (bool, error) {
// Add other in retrieve from cache or compute
func (cache *CacheDriver[T, K]) StoreOrUpdate(keyVal T, newValue K) error {

key, err := cache.processor.ToMapKey(keyVal)
key, err := cache.toMapKey(keyVal)
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit 30e3a1d

Please sign in to comment.