Skip to content

Commit

Permalink
Merge pull request #22 from geniussportsgroup/v2-update-deps
Browse files Browse the repository at this point in the history
V2 update deps
  • Loading branch information
dduartec authored Jun 14, 2024
2 parents 3aa2b09 + 925437f commit a8a3273
Show file tree
Hide file tree
Showing 11 changed files with 445 additions and 94 deletions.
134 changes: 107 additions & 27 deletions cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/geniussportsgroup/gateway_cache/v2/models"
"github.com/geniussportsgroup/gateway_cache/v2/reporter"
)

// State that a cache entry could have
Expand Down Expand Up @@ -54,6 +55,7 @@ type CacheDriver[K any, T any] struct {
missCount int
hitCount int
ttl time.Duration
ttlForNegative time.Duration
head CacheEntry[T] // sentinel header node
lock sync.Mutex
capacity int
Expand All @@ -63,6 +65,7 @@ type CacheDriver[K any, T any] struct {
processor ProcessorI[K, T]
transformer TransformerI[T]
compressor CompressorI
reporter Reporter
}

func (cache *CacheDriver[T, K]) MissCount() int {
Expand Down Expand Up @@ -95,6 +98,10 @@ func (cache *CacheDriver[T, K]) NumEntries() int {
return cache.numEntries
}

func (cache *CacheDriver[T, K]) TTLForNegative() time.Duration {
return cache.ttlForNegative
}

// LazyRemove removes the entry with keyVal from the cache. It does not remove the entry immediately, but it marks it as removed.
func (cache *CacheDriver[T, K]) LazyRemove(keyVal T) error {

Expand Down Expand Up @@ -192,12 +199,16 @@ func (cache *CacheDriver[T, K]) Touch(keyVal T) error {
// ToMapKey(keyVal T) (string, error) //Is the function in charge of transforming the request into a string
// CacheMissSolver(K) (T, *models.RequestError) //Is the function in charge of getting the value in case that does not exist in the cache
// }

type Options[K, T any] func(*CacheDriver[K, T])

func New[K any, T any](
capacity int,
capFactor float64,
ttl time.Duration,
ttlForNegative time.Duration,
processor ProcessorI[K, T],

options ...Options[K, T],
) *CacheDriver[K, T] {

if capFactor < 0.1 || capFactor > 3.0 {
Expand All @@ -213,13 +224,19 @@ func New[K any, T any](
extendedCapacity: int(extendedCapacity),
numEntries: 0,
ttl: ttl,
ttlForNegative: ttlForNegative,
table: make(map[string]*CacheEntry[T], int(extendedCapacity)),
processor: processor,
compressor: lz4Compressor{},
reporter: &reporter.Default{},
}
ret.head.prev = &ret.head
ret.head.next = &ret.head

for _, option := range options {
option(ret)
}

return ret
}

Expand Down Expand Up @@ -276,11 +293,13 @@ func NewWithCompression[T any, K any](
capacity int,
capFactor float64,
ttl time.Duration,
ttlForNegative time.Duration,
processor ProcessorI[T, K],
compressor TransformerI[K],
options ...Options[T, K],
) (cache *CacheDriver[T, K]) {

cache = New(capacity, capFactor, ttl, processor)
cache = New(capacity, capFactor, ttl, ttlForNegative, processor, options...)
if cache != nil {
cache.toCompress = true
cache.transformer = compressor
Expand All @@ -289,6 +308,10 @@ func NewWithCompression[T any, K any](
return cache
}

func (cache *CacheDriver[T, K]) SetReporter(reporter Reporter) {
cache.reporter = reporter
}

// Insert entry as the first item of cache (mru)
func (cache *CacheDriver[T, K]) insertAsMru(entry *CacheEntry[K]) {
entry.prev = &cache.head
Expand Down Expand Up @@ -397,7 +420,8 @@ func (cache *CacheDriver[T, K]) allocateEntry(
// immediately returns the cached entry. If the request is the first, then it blocks until the result is
// ready. If the request is not the first but the result is not still ready, then it blocks
// until the result is ready
func (cache *CacheDriver[T, K]) RetrieveFromCacheOrCompute(request T) (K, *models.RequestError) {
func (cache *CacheDriver[T, K]) RetrieveFromCacheOrCompute(request T,
other ...interface{}) (K, *models.RequestError) {

var requestError *models.RequestError
var zeroK K
Expand All @@ -421,25 +445,29 @@ func (cache *CacheDriver[T, K]) RetrieveFromCacheOrCompute(request T) (K, *model
entry, hit = cache.table[cacheKey]
if hit && currTime.Before(entry.expirationTime) {
cache.hitCount++
cache.becomeMru(entry)
go cache.reporter.ReportHit()
cache.becomeMru(entry) //TODO: check if it is negative
cache.lock.Unlock()

entry.lock.Lock() // will block if it is computing
for entry.state == COMPUTING { // this guard is for protection; it should never be true
entry.cond.Wait() // it will wake up when result arrives
}
defer entry.lock.Unlock()

if entry.state == FAILED5xx {
// entry.expirationTime = currTime.Add(cache.ttlForNegative)
return zeroK, &models.RequestError{
Error: entry.err,
Code: Status5xxCached, // include 4xx and 5xx
}
} else if entry.state == FAILED4xx {
// entry.expirationTime = currTime.Add(cache.ttlForNegative)
return zeroK, &models.RequestError{
Error: entry.err,
Code: Status4xxCached, // include 4xx and 5xx
}
}

entry.timestamp = currTime
entry.expirationTime = currTime.Add(cache.ttl)

Expand Down Expand Up @@ -472,18 +500,18 @@ func (cache *CacheDriver[T, K]) RetrieveFromCacheOrCompute(request T) (K, *model
if err != nil {
cache.lock.Unlock() // an error getting cache entry ==> we invoke directly the uservice
// return cache.callUServices(request, payload, other...)
return cache.processor.CacheMissSolver(request)
return cache.processor.CacheMissSolver(request, other...)
}

entry.state = COMPUTING

go cache.reporter.ReportMiss()
cache.missCount++
cache.lock.Unlock() // release global lock before to take the entry lock

entry.lock.Lock() // other requests will wait for until postProcessedResponse is gotten
entry.lock.Lock() // other requests will wait for until postProcessedResponse is gotten
defer entry.lock.Unlock()

retVal, requestError := cache.processor.CacheMissSolver(request)
retVal, requestError := cache.processor.CacheMissSolver(request, other...)
if requestError != nil {
switch requestError.Code {
case Status4xx, Status4xxCached:
Expand All @@ -494,6 +522,7 @@ func (cache *CacheDriver[T, K]) RetrieveFromCacheOrCompute(request T) (K, *model
entry.state = FAILED5XXMISSHANDLERERROR
}
entry.err = requestError.Error
entry.expirationTime = currTime.Add(cache.ttlForNegative)
} else {
entry.state = COMPUTED
}
Expand All @@ -507,6 +536,7 @@ func (cache *CacheDriver[T, K]) RetrieveFromCacheOrCompute(request T) (K, *model
if err != nil {
entry.state = FAILED5xx
entry.postProcessedResponse = retVal
entry.expirationTime = currTime.Add(cache.ttlForNegative)
} else {
entry.postProcessedResponseCompressed = lz4Buf
}
Expand Down Expand Up @@ -580,11 +610,12 @@ func (it *CacheIt[T, K]) Next() *CacheEntry[K] {
}

type CacheState struct {
MissCount int
HitCount int
TTL time.Duration
Capacity int
NumEntries int
MissCount int
HitCount int
TTL time.Duration
TTLForNegative time.Duration
Capacity int
NumEntries int
}

// GetState Return a json containing the cache state. Use the internal mutex. Be careful with a deadlock
Expand All @@ -594,11 +625,12 @@ func (cache *CacheDriver[T, K]) GetState() (string, error) {
defer cache.lock.Unlock()

state := CacheState{
MissCount: cache.missCount,
HitCount: cache.hitCount,
TTL: cache.ttl,
Capacity: cache.capacity,
NumEntries: cache.numEntries,
MissCount: cache.missCount,
HitCount: cache.hitCount,
TTL: cache.ttl,
TTLForNegative: cache.ttlForNegative,
Capacity: cache.capacity,
NumEntries: cache.numEntries,
}

buf, err := json.MarshalIndent(&state, "", " ")
Expand Down Expand Up @@ -644,7 +676,7 @@ func (cache *CacheDriver[T, K]) Clean() error {
return cache.clean()
}

func (cache *CacheDriver[T, K]) Set(capacity int, ttl time.Duration) error {
func (cache *CacheDriver[T, K]) Set(capacity int, ttl time.Duration, ttlForNegative time.Duration) error {

cache.lock.Lock()
defer cache.lock.Unlock()
Expand All @@ -656,15 +688,24 @@ func (cache *CacheDriver[T, K]) Set(capacity int, ttl time.Duration) error {
cache.capacity = capacity
}

if ttl != 0 {
if ttl != 0 && ttlForNegative != 0 {
var ttlToAdd time.Duration
for it := cache.NewCacheIt(); it.HasCurr(); it.Next() {
entry := it.GetCurr()
if entry.state != COMPUTED {
ttlToAdd = ttl
if entry.state != COMPUTED { //TODO: here is possible add negative ttl in case we want add it here
continue
}
entry.expirationTime = entry.timestamp.Add(ttl)

if entry.state == FAILED4xx || entry.state == FAILED5XXMISSHANDLERERROR || entry.state == FAILED5xx {
ttlToAdd = ttlForNegative
}

entry.expirationTime = entry.timestamp.Add(ttlToAdd)

}
cache.ttl = ttl
cache.ttlForNegative = ttlForNegative
}

return nil
Expand All @@ -678,14 +719,32 @@ func (cache *CacheDriver[T, K]) RetrieveValue(keyVal T) (K, error) {
}

cache.lock.Lock()
if entry, ok := cache.table[key]; ok {
if entry, ok := cache.table[key]; ok && entry.expirationTime.After(time.Now()) {
cache.lock.Unlock()
entry.lock.Lock()
defer entry.lock.Unlock()

for entry.state == COMPUTING { // this guard is for protection; it should never be true
entry.cond.Wait() // it will wake up when result arrives
}

if entry.state != AVAILABLE {
return entry.postProcessedResponse, nil
answer := entry.postProcessedResponse
if cache.toCompress {
buf, err := cache.compressor.Decompress(entry.postProcessedResponseCompressed)
if err != nil {
return zeroK, err
}

answer, err = cache.transformer.BytesToValue(buf)
if err != nil {
return zeroK, err
}

}
return answer, nil
}

return zeroK, nil

}
Expand All @@ -697,11 +756,32 @@ func (cache *CacheDriver[T, K]) RetrieveValue(keyVal T) (K, error) {

// Contains returns true if the key is in the cache
func (cache *CacheDriver[T, K]) Contains(keyVal T) (bool, error) {
_, err := cache.RetrieveValue(keyVal)
key, err := cache.processor.ToMapKey(keyVal)
if err != nil {
return false, err
}
return true, nil
cache.lock.Lock()

if entry, ok := cache.table[key]; ok && entry.expirationTime.After(time.Now()) {
cache.lock.Unlock()
entry.lock.Lock()
defer entry.lock.Unlock()

for entry.state == COMPUTING { // this guard is for protection; it should never be true
entry.cond.Wait() // it will wake up when result arrives
}

if entry.state != AVAILABLE {
return true, nil
}

return false, nil

}

cache.lock.Unlock()

return false, nil
}

// testing
Expand Down
Loading

0 comments on commit a8a3273

Please sign in to comment.