Skip to content

Commit

Permalink
Revert changes to caching library used in transformer (#433)
Browse files Browse the repository at this point in the history
<!--  Thanks for sending a pull request!  Here are some tips for you:

1. Run unit tests and ensure that they are passing
2. If your change introduces any API changes, make sure to update the
e2e tests
3. Make sure documentation is updated for your PR!

-->

**What this PR does / why we need it**:
<!-- Explain here the context and why you're making the change. What is
the problem you're trying to solve. --->

#418 replaced the in-memory
caching library used in the code with `github.com/patrickmn/go-cache`
everywhere. However, this could be problematic for the transformer, as
indicated by [this
comment](#418 (comment)).
So, this PR reverts the changes to the caching library for the
transformer, so it can continue to rely on
`github.com/coocood/freecache`, which can bound the memory allocated for
the caching.

**Does this PR introduce a user-facing change?**:
<!--
If no, just write "NONE" in the release-note block below.
If yes, a release note is required. Enter your extended release note in
the block below.
If the PR requires additional action from users switching to the new
release, include the string "action required".

For more information about release notes, see kubernetes' guide here:
http://git.k8s.io/community/contributors/guide/release-notes.md
-->

```release-note
Replace Transformer caching library. Support for the transformer env var config CACHE_SIZE_IN_MB is now re-enabled.
```

**Checklist**

- [x] Added unit test, integration, and/or e2e tests
- [ ] Tested locally
- [ ] Updated documentation
- [ ] Update Swagger spec if the PR introduce API changes
- [ ] Regenerated Golang and Python client if the PR introduce API
changes
  • Loading branch information
krithika369 authored Jul 19, 2023
1 parent d369399 commit 2a70a79
Show file tree
Hide file tree
Showing 12 changed files with 150 additions and 87 deletions.
1 change: 1 addition & 0 deletions api/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ require (
github.com/cespare/xxhash v1.1.0
github.com/confluentinc/confluent-kafka-go v1.9.2
github.com/confluentinc/confluent-kafka-go/v2 v2.0.2
github.com/coocood/freecache v1.2.3
github.com/fatih/color v1.13.0
github.com/feast-dev/feast/sdk/go v0.9.4
github.com/fraugster/parquet-go v0.10.0
Expand Down
2 changes: 2 additions & 0 deletions api/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,8 @@ github.com/containernetworking/plugins v0.9.1/go.mod h1:xP/idU2ldlzN6m4p5LmGiwRD
github.com/containers/ocicrypt v1.0.1/go.mod h1:MeJDzk1RJHv89LjsH0Sp5KTY3ZYkjXO/C+bKAeWFIrc=
github.com/containers/ocicrypt v1.1.0/go.mod h1:b8AOe0YR67uU8OqfVNcznfFpAzu3rdgUV4GP9qXPfu4=
github.com/containers/ocicrypt v1.1.1/go.mod h1:Dm55fwWm1YZAjYRaJ94z2mfZikIyIN4B0oB3dj3jFxY=
github.com/coocood/freecache v1.2.3 h1:lcBwpZrwBZRZyLk/8EMyQVXRiFl663cCuMOrjCALeto=
github.com/coocood/freecache v1.2.3/go.mod h1:RBUWa/Cy+OHdfTGFEhEuE1pMCMX51Ncizj7rthiQ3vk=
github.com/coredns/corefile-migration v1.0.2/go.mod h1:OFwBp/Wc9dJt5cAZzHWMNhK1r5L0p0jDwIBc6j8NC8E=
github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk=
github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
Expand Down
20 changes: 10 additions & 10 deletions api/pkg/transformer/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,31 @@ package cache
import (
"time"

cache "github.com/patrickmn/go-cache"
"github.com/coocood/freecache"
)

type Cache interface {
Insert(key string, value interface{}, ttl time.Duration)
Fetch(key string) (interface{}, bool)
Insert(key []byte, value []byte, ttl time.Duration) error
Fetch(key []byte) ([]byte, error)
}

type inMemoryCache struct {
cache *cache.Cache
cache *freecache.Cache
}

const (
cacheCleanUpSeconds = 300
MB = 1024 * 1024
)

func NewInMemoryCache() *inMemoryCache {
executor := cache.New(0, cacheCleanUpSeconds*time.Second)
func NewInMemoryCache(sizeInMB int) *inMemoryCache {
executor := freecache.NewCache(sizeInMB * MB)
return &inMemoryCache{cache: executor}
}

func (c *inMemoryCache) Insert(key string, value interface{}, ttl time.Duration) {
c.cache.Set(key, value, ttl)
func (c *inMemoryCache) Insert(key []byte, value []byte, ttl time.Duration) error {
return c.cache.Set(key, value, int(ttl/time.Second))
}

func (c *inMemoryCache) Fetch(key string) (interface{}, bool) {
func (c *inMemoryCache) Fetch(key []byte) ([]byte, error) {
return c.cache.Get(key)
}
59 changes: 38 additions & 21 deletions api/pkg/transformer/cache/cache_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package cache

import (
"encoding/json"
"reflect"
"testing"
"time"

"github.com/coocood/freecache"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand All @@ -12,33 +15,40 @@ func TestCache(t *testing.T) {
testCases := []struct {
desc string
data interface{}
key string
key []byte
}{
{
desc: "Success - 1",
data: map[string]string{"key": "value"},
key: "key1",
key: []byte("key1"),
},
{
desc: "Success - 2",
data: map[string]string{"key": "value"},
key: "key2",
key: []byte("key2"),
},
{
desc: "Success - 3",
data: []int{1, 2, 3},
key: "key3",
key: []byte("key3"),
},
}
for _, tC := range testCases {
t.Run(tC.desc, func(t *testing.T) {
cache := NewInMemoryCache()
cache.Insert(tC.key, tC.data, 2*time.Second)
dataByte, err := json.Marshal(tC.data)
require.NoError(t, err)
cache := NewInMemoryCache(1)
err = cache.Insert(tC.key, dataByte, 1)
assert.NoError(t, err)

cachedValue, ok := cache.Fetch(tC.key)
require.True(t, ok)
cachedValue, err := cache.Fetch(tC.key)
require.NoError(t, err)

assert.Equal(t, tC.data, cachedValue)
var val interface{}
err = json.Unmarshal(cachedValue, &val)
require.NoError(t, err)

reflect.DeepEqual(tC.data, val)
})
}
}
Expand All @@ -47,38 +57,45 @@ func TestCache_Expiry(t *testing.T) {
testCases := []struct {
desc string
data interface{}
key string
key []byte
delayOfFetching int
foundInCache bool
err error
}{
{
desc: "Success - Not expired",
data: map[string]string{"key": "value"},
key: "key1",
key: []byte("key1"),
delayOfFetching: 0,
foundInCache: true,
},
{
desc: "Success - Expired",
data: map[string]string{"key": "value"},
key: "key1",
delayOfFetching: 3,
key: []byte("key1"),
delayOfFetching: 2,
err: freecache.ErrNotFound,
},
}
for _, tC := range testCases {
t.Run(tC.desc, func(t *testing.T) {
cache := NewInMemoryCache()
cache.Insert(tC.key, tC.data, 2*time.Second)
dataByte, err := json.Marshal(tC.data)
require.NoError(t, err)
cache := NewInMemoryCache(1)
err = cache.Insert(tC.key, dataByte, 1*time.Second)
assert.NoError(t, err)

if tC.delayOfFetching > 0 {
time.Sleep(time.Duration(tC.delayOfFetching) * time.Second)
}

cachedValue, ok := cache.Fetch(tC.key)
require.Equal(t, tC.foundInCache, ok)
cachedValue, err := cache.Fetch(tC.key)
assert.Equal(t, tC.err, err)

if err == nil {
var val interface{}
err = json.Unmarshal(cachedValue, &val)
require.NoError(t, err)

if ok {
assert.Equal(t, tC.data, cachedValue)
reflect.DeepEqual(tC.data, val)
}

})
Expand Down
9 changes: 5 additions & 4 deletions api/pkg/transformer/executor/transformer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,10 +445,11 @@ func TestStandardTransformer_Execute(t *testing.T) {
require.NoError(t, err)

compiler := pipeline.NewCompiler(symbol.NewRegistry(), feastClients, &feast.Options{
CacheEnabled: true,
CacheTTL: 60 * time.Second,
BatchSize: 100,
FeastTimeout: 1 * time.Second,
CacheEnabled: true,
CacheSizeInMB: 100,
CacheTTL: 60 * time.Second,
BatchSize: 100,
FeastTimeout: 1 * time.Second,

DefaultFeastSource: spec.ServingSource_BIGTABLE,
StorageConfigs: feast.FeastStorageConfig{
Expand Down
19 changes: 11 additions & 8 deletions api/pkg/transformer/feast/feature_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ type featureCache struct {
ttl time.Duration
}

func newFeatureCache(ttl time.Duration) *featureCache {
func newFeatureCache(ttl time.Duration, sizeInMB int) *featureCache {
return &featureCache{
cache: cache.NewInMemoryCache(),
cache: cache.NewInMemoryCache(sizeInMB),
ttl: ttl,
}
}
Expand Down Expand Up @@ -74,15 +74,15 @@ func (fc *featureCache) fetchFeatureTable(entities []feast.Row, columnNames []st
}

feastCacheRetrievalCount.Inc()
val, ok := fc.cache.Fetch(string(keyByte))
if !ok {
val, err := fc.cache.Fetch(keyByte)
if err != nil {
entityNotInCache = append(entityNotInCache, entity)
continue
}

feastCacheHitCount.Inc()
cacheValue, ok := val.(CacheValue)
if !ok {
var cacheValue CacheValue
if err := json.Unmarshal(val, &cacheValue); err != nil {
entityNotInCache = append(entityNotInCache, entity)
continue
}
Expand Down Expand Up @@ -136,8 +136,11 @@ func (fc *featureCache) insertFeaturesOfEntity(entity feast.Row, columnNames []s
ValueRow: value,
ValueTypes: valueTypes,
}
fc.cache.Insert(string(keyByte), cacheValue, fc.ttl)
return nil
dataByte, err := json.Marshal(cacheValue)
if err != nil {
return err
}
return fc.cache.Insert(keyByte, dataByte, fc.ttl)
}

func castValueRow(row types.ValueRow, columnTypes []feastTypes.ValueType_Enum) (types.ValueRow, error) {
Expand Down
28 changes: 18 additions & 10 deletions api/pkg/transformer/feast/feature_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ import (

func TestFeatureCache_FetchFeatureTable(t *testing.T) {
type cacheConfig struct {
ttl time.Duration
ttl time.Duration
sizeInMB int
}
type args struct {
entities []feast.Row
Expand All @@ -31,7 +32,8 @@ func TestFeatureCache_FetchFeatureTable(t *testing.T) {
{
name: "single entity, no value in cache",
cacheConfig: cacheConfig{
ttl: 10 * time.Minute,
ttl: 10 * time.Minute,
sizeInMB: 10,
},
args: args{
entities: []feast.Row{
Expand All @@ -58,7 +60,8 @@ func TestFeatureCache_FetchFeatureTable(t *testing.T) {
{
name: "two entities, no value in cache",
cacheConfig: cacheConfig{
ttl: 10 * time.Minute,
ttl: 10 * time.Minute,
sizeInMB: 10,
},
args: args{
entities: []feast.Row{
Expand Down Expand Up @@ -91,7 +94,8 @@ func TestFeatureCache_FetchFeatureTable(t *testing.T) {
{
name: "two entities, only one has value in cache",
cacheConfig: cacheConfig{
ttl: 10 * time.Minute,
ttl: 10 * time.Minute,
sizeInMB: 10,
},
args: args{
entities: []feast.Row{
Expand Down Expand Up @@ -144,7 +148,8 @@ func TestFeatureCache_FetchFeatureTable(t *testing.T) {
{
name: "two entities, both have value in cache",
cacheConfig: cacheConfig{
ttl: 10 * time.Minute,
ttl: 10 * time.Minute,
sizeInMB: 10,
},
args: args{
entities: []feast.Row{
Expand Down Expand Up @@ -207,7 +212,8 @@ func TestFeatureCache_FetchFeatureTable(t *testing.T) {
{
name: "one entity, but cache contain different list of feature than requested",
cacheConfig: cacheConfig{
ttl: 10 * time.Minute,
ttl: 10 * time.Minute,
sizeInMB: 10,
},
args: args{
entities: []feast.Row{
Expand Down Expand Up @@ -252,7 +258,7 @@ func TestFeatureCache_FetchFeatureTable(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
fc := newFeatureCache(tt.cacheConfig.ttl)
fc := newFeatureCache(tt.cacheConfig.ttl, tt.cacheConfig.sizeInMB)
if tt.valueInCache != nil {
err := fc.insertFeatureTable(tt.valueInCache, tt.args.project)
if err != nil {
Expand All @@ -269,7 +275,8 @@ func TestFeatureCache_FetchFeatureTable(t *testing.T) {

func TestFeatureCache_InsertFeatureTable(t *testing.T) {
type cacheConfig struct {
ttl time.Duration
ttl time.Duration
sizeInMB int
}
type args struct {
featureTable *internalFeatureTable
Expand All @@ -285,7 +292,8 @@ func TestFeatureCache_InsertFeatureTable(t *testing.T) {
{
name: "insert table containing one entity",
cacheConfig: cacheConfig{
ttl: 10 * time.Minute,
ttl: 10 * time.Minute,
sizeInMB: 10,
},
args: args{
featureTable: &internalFeatureTable{
Expand All @@ -310,7 +318,7 @@ func TestFeatureCache_InsertFeatureTable(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
fc := newFeatureCache(tt.cacheConfig.ttl)
fc := newFeatureCache(tt.cacheConfig.ttl, tt.cacheConfig.sizeInMB)
err := fc.insertFeatureTable(tt.args.featureTable, tt.args.project)
if err != nil {
if !tt.wantErr {
Expand Down
4 changes: 3 additions & 1 deletion api/pkg/transformer/feast/feature_retriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func NewFeastRetriever(
return &FeastRetriever{
feastClients: feastClients,
entityExtractor: entityExtractor,
featureCache: newFeatureCache(options.CacheTTL),
featureCache: newFeatureCache(options.CacheTTL, options.CacheSizeInMB),
featureTableSpecs: featureTableSpecs,
defaultValues: defaultValues,
options: options,
Expand All @@ -95,6 +95,8 @@ type Options struct {
CacheEnabled bool `envconfig:"FEAST_CACHE_ENABLED" default:"true"`
// Duration of cache will be lived and used as response
CacheTTL time.Duration `envconfig:"FEAST_CACHE_TTL" default:"60s"`
// Size of cache that can be store
CacheSizeInMB int `envconfig:"CACHE_SIZE_IN_MB" default:"100"`

// Timeout of feast request
FeastTimeout time.Duration `envconfig:"FEAST_TIMEOUT" default:"1s"`
Expand Down
2 changes: 2 additions & 0 deletions api/pkg/transformer/feast/feature_retriever_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1631,6 +1631,7 @@ func TestFeatureRetriever_RetrieveFeatureOfEntityInRequest(t *testing.T) {
FeastClientHystrixCommandName: "TestFeatureRetriever_RetrieveFeatureOfEntityInRequest",
FeastClientMaxConcurrentRequests: 2,
CacheEnabled: true,
CacheSizeInMB: 100,
CacheTTL: 10 * time.Minute,
FeastTimeout: 1 * time.Second,
},
Expand Down Expand Up @@ -2644,6 +2645,7 @@ func TestFeatureRetriever_RetrieveFeatureOfEntityInRequest_FeastTimeout(t *testi
FeastClientHystrixCommandName: "TestFeatureRetriever_RetrieveFeatureOfEntityInRequest_FeastTimeout",
FeastClientMaxConcurrentRequests: 100,
CacheEnabled: true,
CacheSizeInMB: 100,
CacheTTL: 10 * time.Minute,
},
logger,
Expand Down
Loading

0 comments on commit 2a70a79

Please sign in to comment.