Skip to content

Commit

Permalink
✨ store and retrieve value functionalities added
Browse files Browse the repository at this point in the history
  • Loading branch information
freddy.cuellar committed Jul 13, 2023
1 parent 4d2784e commit 43fb98b
Show file tree
Hide file tree
Showing 2 changed files with 216 additions and 18 deletions.
158 changes: 140 additions & 18 deletions cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,31 +178,31 @@ func (cache *CacheDriver[T, K]) Touch(keyVal T) error {
}

// Contains returns true if the cache contains keyVal. It does not update the entry timestamp and consequently it does not change the eviction order.
func (cache *CacheDriver[T, K]) Contains(keyVal T) (bool, error) {
// func (cache *CacheDriver[T, K]) Contains(keyVal T) (bool, error) {

key, err := cache.processor.ToMapKey(keyVal)
if err != nil {
return false, err
}
// key, err := cache.processor.ToMapKey(keyVal)
// if err != nil {
// return false, err
// }

cache.lock.Lock()
// cache.lock.Lock()

if entry, ok := cache.table[key]; ok {
cache.lock.Unlock()
entry.lock.Lock()
defer entry.lock.Unlock()
// if entry, ok := cache.table[key]; ok {
// cache.lock.Unlock()
// entry.lock.Lock()
// defer entry.lock.Unlock()

if entry.state != AVAILABLE {
return true, nil
}
// if entry.state != AVAILABLE {
// return true, nil
// }

return false, nil
}
// return false, nil
// }

cache.lock.Unlock()
// cache.lock.Unlock()

return false, nil
}
// return false, nil
// }

// New Creates a new cache. Parameters are:
//
Expand Down Expand Up @@ -696,3 +696,125 @@ func (cache *CacheDriver[T, K]) Set(capacity int, ttl time.Duration) error {

return nil
}

func (cache *CacheDriver[T, K]) RetrieveValue(keyVal T) (K, error) {
var zeroK K
key, err := cache.processor.ToMapKey(keyVal)
if err != nil {
return zeroK, err
}

cache.lock.Lock()
if entry, ok := cache.table[key]; ok {
cache.lock.Unlock()
entry.lock.Lock()
defer entry.lock.Unlock()

if entry.state != AVAILABLE {
return entry.postProcessedResponse, nil
}
return zeroK, nil

}

cache.lock.Unlock()

return zeroK, nil
}

// Contains returns true if the key is in the cache
func (cache *CacheDriver[T, K]) Contains(keyVal T) (bool, error) {
_, err := cache.RetrieveValue(keyVal)
if err != nil {
return false, err
}
return true, nil
}

// testing
// Add other in retrieve from cache or compute
func (cache *CacheDriver[T, K]) StoreValue(keyVal T, newValue K) error {

key, err := cache.processor.ToMapKey(keyVal)
if err != nil {
return err
}

cache.lock.Lock()

if entry, ok := cache.table[key]; ok {
cache.lock.Unlock()
entry.lock.Lock()
defer entry.lock.Unlock()

currentTime := time.Now()
if entry.expirationTime.Before(currentTime) {
return ErrEntryExpired
}

if entry.state != COMPUTING && entry.state != AVAILABLE {
if cache.toCompress {
buf, err := cache.transformer.ValueToBytes(newValue)
if err != nil {
return err
}
lz4Buf, err := cache.compressor.Compress(buf)
if err != nil {
return err
}
entry.postProcessedResponseCompressed = lz4Buf
} else {
entry.postProcessedResponse = newValue
}
// cache.lock.Lock()
// cache.becomeMru(entry)
// cache.lock.Unlock()
return nil
}

if entry.state == AVAILABLE {
return ErrEntryAvailableState
}

return ErrEntryComputingState
} else {
currTime := time.Now()
entry, err = cache.allocateEntry(key, currTime)
if err != nil {
cache.lock.Unlock() // an error getting cache entry ==> we invoke directly the uservice
// return cache.callUServices(request, payload, other...)
return err
}

entry.state = COMPUTING

//TODO specify if increases this one
// cache.missCount++
cache.lock.Unlock() // release global lock before to take the entry lock

entry.lock.Lock() // other requests will wait for until postProcessedResponse is gotten
defer entry.lock.Unlock()

retVal := newValue
entry.state = COMPUTED

if cache.toCompress {
buf, err := cache.transformer.ValueToBytes(retVal) // transforms retVal into a []byte ready for compression
if err != nil {
entry.state = FAILED5xx
}
lz4Buf, err := cache.compressor.Compress(buf)
if err != nil {
entry.state = FAILED5xx
entry.postProcessedResponse = retVal
} else {
entry.postProcessedResponseCompressed = lz4Buf
}
return nil
}
entry.postProcessedResponse = retVal
entry.cond.Broadcast() // wake up eventual requests waiting for the result (which has failed!)
return nil
}

}
76 changes: 76 additions & 0 deletions cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,82 @@ func TestCacheDriver_Touch(t *testing.T) {
}
}

// test the StoreValue method first insert values in the cache
func TestCacheDriver_RetrieveValue(t *testing.T) {
processor := mocks.NewProcessorI[int, int](t)
cache := New[int, int](Capacity, .4, TTL, processor)

elements := Capacity
for i := 0; i < elements; i++ {
processor.EXPECT().ToMapKey(i).Return(fmt.Sprint(i), nil).Times(2)
err := cache.StoreValue(i, i)
assert.Nil(t, err)
err = cache.StoreValue(i, i)
assert.Nil(t, err)
}

for i := 0; i < elements; i++ {
processor.EXPECT().ToMapKey(i).Return(fmt.Sprint(i), nil).Times(1)
val, err := cache.RetrieveFromCacheOrCompute(i)

assert.Equal(t, val, i)
assert.Nil(t, err)

}
}

func TestCacheDriver_RetrieveValueConcurrentInsert(t *testing.T) {
processor := mocks.NewProcessorI[int, int](t)
cache := New[int, int](Capacity, .4, TTL, processor)

elements := Capacity
goroutines := 5
wg := sync.WaitGroup{}
wg.Add(goroutines * elements)
for i := 0; i < elements; i++ {
processor.EXPECT().ToMapKey(i).Return(fmt.Sprint(i), nil).Times(goroutines)
for j := 0; j < goroutines; j++ {
go func(i int, t *testing.T, wg *sync.WaitGroup) {
err := cache.StoreValue(i, i)
assert.Nil(t, err)
wg.Done()
}(i, t, &wg)
}
}
wg.Wait()

for i := 0; i < elements; i++ {
processor.EXPECT().ToMapKey(i).Return(fmt.Sprint(i), nil).Times(1)
val, err := cache.RetrieveFromCacheOrCompute(i)

assert.Equal(t, val, i)
assert.Nil(t, err)

}
}

// test retrieve value
func TestCacheDriver_RetrieveValueConcurrentRetrieve(t *testing.T) {
processor := mocks.NewProcessorI[int, int](t)
cache := New[int, int](Capacity, .4, TTL, processor)

elements := Capacity
for i := 0; i < elements; i++ {
b, requestError := insertEntry(cache, processor, i)
assert.Nil(t, requestError)
assert.NotNil(t, b)
}

for i := 0; i < elements; i++ {
processor.EXPECT().ToMapKey(i).Return(fmt.Sprint(i), nil).Times(1)
val, err := cache.RetrieveValue(i)

assert.Equal(t, val, i)
assert.Nil(t, err)

}
}

//benchmark to test the performance of the cache
//when the processor should perform an addition task

Expand Down

0 comments on commit 43fb98b

Please sign in to comment.