Skip to content

Commit

Permalink
chore: Update cache library (#418)
Browse files Browse the repository at this point in the history
<!--  Thanks for sending a pull request!  Here are some tips for you:

1. Run unit tests and ensure that they are passing
2. If your change introduces any API changes, make sure to update the
e2e tests
3. Make sure documentation is updated for your PR!

-->

**What this PR does / why we need it**:
<!-- Explain here the context and why you're making the change. What is
the problem you're trying to solve. --->

This PR replaces the use of `"github.com/coocood/freecache"` in the
codebase with `"github.com/patrickmn/go-cache"` for the following
reasons:
* The `freecache` library needs us to declare the size of the cache
ahead of time. `go-cache`, on the other hand, is map-based and simpler.
* The `freecache` library requires each entry to be less than 1/1024 of
cache size. This would require us to calculate the cache size ahead of
time and does not allow for the occasional large-sized entries.
`go-cache` has no such restriction.

With this change, the transformer config `CACHE_SIZE_IN_MB` is no longer
required and has been removed.

**Other changes**:

`.env.sample` is replaced by `env-config.yaml`, based on the changes
introduced by #397

**Does this PR introduce a user-facing change?**:
<!--
If no, just write "NONE" in the release-note block below.
If yes, a release note is required. Enter your extended release note in
the block below.
If the PR requires additional action from users switching to the new
release, include the string "action required".

For more information about release notes, see kubernetes' guide here:
http://git.k8s.io/community/contributors/guide/release-notes.md
-->

```release-note
Replace Project and Transformer caching library with one that can grow flexibly. Support for the transformer env var config CACHE_SIZE_IN_MB is now removed and if set, will be unused.
```

**Checklist**

- [x] Added unit test, integration, and/or e2e tests
- [x] Tested locally
- [ ] Updated documentation
- [ ] Update Swagger spec if the PR introduce API changes
- [ ] Regenerated Golang and Python client if the PR introduce API
changes
  • Loading branch information
krithika369 authored Jul 6, 2023
1 parent 216b2eb commit c919cb1
Show file tree
Hide file tree
Showing 19 changed files with 200 additions and 272 deletions.
69 changes: 0 additions & 69 deletions .env.sample

This file was deleted.

5 changes: 1 addition & 4 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
include .env.sample
export

BIN_NAME=merlin
TRANSFORMER_BIN_NAME=merlin-transformer
INFERENCE_LOGGER_BIN_NAME=merlin-logger
Expand Down Expand Up @@ -129,7 +126,7 @@ build-inference-logger:
.PHONY: run
run:
@echo "> Running application ..."
@./bin/${BIN_NAME}
@./bin/${BIN_NAME} -config ./config.yaml

.PHONY: run-ui
run-ui:
Expand Down
4 changes: 3 additions & 1 deletion api/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ make setup

To start Merlin locally you have to:

- Replace `VAULT_TOKEN` value in .env.sample
```bash
make build
```

Then execute:

Expand Down
3 changes: 0 additions & 3 deletions api/cmd/transformer/Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
include .env.sample
export

BIN_NAME=feast-transformer
BRANCH=$(shell git rev-parse --abbrev-ref HEAD)
REVISION=$(shell git rev-parse HEAD)
Expand Down
3 changes: 1 addition & 2 deletions api/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ require (
github.com/cespare/xxhash v1.1.0
github.com/confluentinc/confluent-kafka-go v1.9.2
github.com/confluentinc/confluent-kafka-go/v2 v2.0.2
github.com/coocood/freecache v1.1.1
github.com/fatih/color v1.13.0
github.com/feast-dev/feast/sdk/go v0.9.4
github.com/fraugster/parquet-go v0.10.0
Expand Down Expand Up @@ -52,6 +51,7 @@ require (
github.com/opentracing-contrib/go-stdlib v1.0.0
github.com/opentracing/opentracing-go v1.2.0
github.com/ory/viper v1.7.5
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/pilagod/gorm-cursor-paginator v1.3.0
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.12.1
Expand Down Expand Up @@ -184,7 +184,6 @@ require (
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.0.3-0.20211202183452-c5a74bcca799 // indirect
github.com/ory/keto-client-go v0.4.4-alpha.1 // indirect
github.com/patrickmn/go-cache v2.1.0+incompatible // indirect
github.com/pelletier/go-toml v1.9.5 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/princjef/gomarkdoc v0.4.1 // indirect
Expand Down
2 changes: 0 additions & 2 deletions api/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -405,8 +405,6 @@ github.com/containernetworking/plugins v0.9.1/go.mod h1:xP/idU2ldlzN6m4p5LmGiwRD
github.com/containers/ocicrypt v1.0.1/go.mod h1:MeJDzk1RJHv89LjsH0Sp5KTY3ZYkjXO/C+bKAeWFIrc=
github.com/containers/ocicrypt v1.1.0/go.mod h1:b8AOe0YR67uU8OqfVNcznfFpAzu3rdgUV4GP9qXPfu4=
github.com/containers/ocicrypt v1.1.1/go.mod h1:Dm55fwWm1YZAjYRaJ94z2mfZikIyIN4B0oB3dj3jFxY=
github.com/coocood/freecache v1.1.1 h1:uukNF7QKCZEdZ9gAV7WQzvh0SbjwdMF6m3x3rxEkaPc=
github.com/coocood/freecache v1.1.1/go.mod h1:OKrEjkGVoxZhyWAJoeFi5BMLUJm2Tit0kpGkIr7NGYY=
github.com/coredns/corefile-migration v1.0.2/go.mod h1:OFwBp/Wc9dJt5cAZzHWMNhK1r5L0p0jDwIBc6j8NC8E=
github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk=
github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
Expand Down
20 changes: 10 additions & 10 deletions api/pkg/transformer/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,31 @@ package cache
import (
"time"

"github.com/coocood/freecache"
cache "github.com/patrickmn/go-cache"
)

type Cache interface {
Insert(key []byte, value []byte, ttl time.Duration) error
Fetch(key []byte) ([]byte, error)
Insert(key string, value interface{}, ttl time.Duration)
Fetch(key string) (interface{}, bool)
}

type inMemoryCache struct {
cache *freecache.Cache
cache *cache.Cache
}

const (
MB = 1024 * 1024
cacheCleanUpSeconds = 300
)

func NewInMemoryCache(sizeInMB int) *inMemoryCache {
executor := freecache.NewCache(sizeInMB * MB)
func NewInMemoryCache() *inMemoryCache {
executor := cache.New(0, cacheCleanUpSeconds*time.Second)
return &inMemoryCache{cache: executor}
}

func (c *inMemoryCache) Insert(key []byte, value []byte, ttl time.Duration) error {
return c.cache.Set(key, value, int(ttl/time.Second))
func (c *inMemoryCache) Insert(key string, value interface{}, ttl time.Duration) {
c.cache.Set(key, value, ttl)
}

func (c *inMemoryCache) Fetch(key []byte) ([]byte, error) {
func (c *inMemoryCache) Fetch(key string) (interface{}, bool) {
return c.cache.Get(key)
}
59 changes: 21 additions & 38 deletions api/pkg/transformer/cache/cache_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
package cache

import (
"encoding/json"
"reflect"
"testing"
"time"

"github.com/coocood/freecache"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand All @@ -15,40 +12,33 @@ func TestCache(t *testing.T) {
testCases := []struct {
desc string
data interface{}
key []byte
key string
}{
{
desc: "Success - 1",
data: map[string]string{"key": "value"},
key: []byte("key1"),
key: "key1",
},
{
desc: "Success - 2",
data: map[string]string{"key": "value"},
key: []byte("key2"),
key: "key2",
},
{
desc: "Success - 3",
data: []int{1, 2, 3},
key: []byte("key3"),
key: "key3",
},
}
for _, tC := range testCases {
t.Run(tC.desc, func(t *testing.T) {
dataByte, err := json.Marshal(tC.data)
require.NoError(t, err)
cache := NewInMemoryCache(1)
err = cache.Insert(tC.key, dataByte, 1)
assert.NoError(t, err)
cache := NewInMemoryCache()
cache.Insert(tC.key, tC.data, 2*time.Second)

cachedValue, err := cache.Fetch(tC.key)
require.NoError(t, err)
cachedValue, ok := cache.Fetch(tC.key)
require.True(t, ok)

var val interface{}
err = json.Unmarshal(cachedValue, &val)
require.NoError(t, err)

reflect.DeepEqual(tC.data, val)
assert.Equal(t, tC.data, cachedValue)
})
}
}
Expand All @@ -57,45 +47,38 @@ func TestCache_Expiry(t *testing.T) {
testCases := []struct {
desc string
data interface{}
key []byte
key string
delayOfFetching int
err error
foundInCache bool
}{
{
desc: "Success - Not expired",
data: map[string]string{"key": "value"},
key: []byte("key1"),
key: "key1",
delayOfFetching: 0,
foundInCache: true,
},
{
desc: "Success - Expired",
data: map[string]string{"key": "value"},
key: []byte("key1"),
delayOfFetching: 2,
err: freecache.ErrNotFound,
key: "key1",
delayOfFetching: 3,
},
}
for _, tC := range testCases {
t.Run(tC.desc, func(t *testing.T) {
dataByte, err := json.Marshal(tC.data)
require.NoError(t, err)
cache := NewInMemoryCache(1)
err = cache.Insert(tC.key, dataByte, 1*time.Second)
assert.NoError(t, err)
cache := NewInMemoryCache()
cache.Insert(tC.key, tC.data, 2*time.Second)

if tC.delayOfFetching > 0 {
time.Sleep(time.Duration(tC.delayOfFetching) * time.Second)
}

cachedValue, err := cache.Fetch(tC.key)
assert.Equal(t, tC.err, err)

if err == nil {
var val interface{}
err = json.Unmarshal(cachedValue, &val)
require.NoError(t, err)
cachedValue, ok := cache.Fetch(tC.key)
require.Equal(t, tC.foundInCache, ok)

reflect.DeepEqual(tC.data, val)
if ok {
assert.Equal(t, tC.data, cachedValue)
}

})
Expand Down
9 changes: 4 additions & 5 deletions api/pkg/transformer/executor/transformer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,11 +445,10 @@ func TestStandardTransformer_Execute(t *testing.T) {
require.NoError(t, err)

compiler := pipeline.NewCompiler(symbol.NewRegistry(), feastClients, &feast.Options{
CacheEnabled: true,
CacheSizeInMB: 100,
CacheTTL: 60 * time.Second,
BatchSize: 100,
FeastTimeout: 1 * time.Second,
CacheEnabled: true,
CacheTTL: 60 * time.Second,
BatchSize: 100,
FeastTimeout: 1 * time.Second,

DefaultFeastSource: spec.ServingSource_BIGTABLE,
StorageConfigs: feast.FeastStorageConfig{
Expand Down
19 changes: 8 additions & 11 deletions api/pkg/transformer/feast/feature_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ type featureCache struct {
ttl time.Duration
}

func newFeatureCache(ttl time.Duration, sizeInMB int) *featureCache {
func newFeatureCache(ttl time.Duration) *featureCache {
return &featureCache{
cache: cache.NewInMemoryCache(sizeInMB),
cache: cache.NewInMemoryCache(),
ttl: ttl,
}
}
Expand Down Expand Up @@ -74,15 +74,15 @@ func (fc *featureCache) fetchFeatureTable(entities []feast.Row, columnNames []st
}

feastCacheRetrievalCount.Inc()
val, err := fc.cache.Fetch(keyByte)
if err != nil {
val, ok := fc.cache.Fetch(string(keyByte))
if !ok {
entityNotInCache = append(entityNotInCache, entity)
continue
}

feastCacheHitCount.Inc()
var cacheValue CacheValue
if err := json.Unmarshal(val, &cacheValue); err != nil {
cacheValue, ok := val.(CacheValue)
if !ok {
entityNotInCache = append(entityNotInCache, entity)
continue
}
Expand Down Expand Up @@ -136,11 +136,8 @@ func (fc *featureCache) insertFeaturesOfEntity(entity feast.Row, columnNames []s
ValueRow: value,
ValueTypes: valueTypes,
}
dataByte, err := json.Marshal(cacheValue)
if err != nil {
return err
}
return fc.cache.Insert(keyByte, dataByte, fc.ttl)
fc.cache.Insert(string(keyByte), cacheValue, fc.ttl)
return nil
}

func castValueRow(row types.ValueRow, columnTypes []feastTypes.ValueType_Enum) (types.ValueRow, error) {
Expand Down
Loading

0 comments on commit c919cb1

Please sign in to comment.