diff --git a/core/pkg/evaluator/json.go b/core/pkg/evaluator/json.go index 26743f1b2..419161a46 100644 --- a/core/pkg/evaluator/json.go +++ b/core/pkg/evaluator/json.go @@ -121,11 +121,11 @@ func (je *JSON) SetState(payload sync.DataSync) (map[string]interface{}, bool, e switch payload.Type { case sync.ALL: - events, reSync = je.store.Merge(je.Logger, payload.Source, newFlags.Flags) + events, reSync = je.store.Merge(je.Logger, payload.Source, payload.Selector, newFlags.Flags) case sync.ADD: - events = je.store.Add(je.Logger, payload.Source, newFlags.Flags) + events = je.store.Add(je.Logger, payload.Source, payload.Selector, newFlags.Flags) case sync.UPDATE: - events = je.store.Update(je.Logger, payload.Source, newFlags.Flags) + events = je.store.Update(je.Logger, payload.Source, payload.Selector, newFlags.Flags) case sync.DELETE: events = je.store.DeleteFlags(je.Logger, payload.Source, newFlags.Flags) default: diff --git a/core/pkg/evaluator/json_test.go b/core/pkg/evaluator/json_test.go index 0320521f2..ae5a94f56 100644 --- a/core/pkg/evaluator/json_test.go +++ b/core/pkg/evaluator/json_test.go @@ -905,6 +905,7 @@ func TestState_Evaluator(t *testing.T) { "defaultVariant": "recursive", "state": "ENABLED", "source":"", + "selector":"", "targeting": { "if": [ { @@ -965,6 +966,7 @@ func TestState_Evaluator(t *testing.T) { "defaultVariant": "recursive", "state": "ENABLED", "source":"", + "selector":"", "targeting": { "if": [ { @@ -1023,7 +1025,6 @@ func TestState_Evaluator(t *testing.T) { }, "defaultVariant": "recursive", "state": "ENABLED", - "source":"", "targeting": { "if": [ { @@ -1077,6 +1078,7 @@ func TestState_Evaluator(t *testing.T) { "defaultVariant": "recursive", "state": "ENABLED", "source":"", + "selector":"", "targeting": { "if": [ { @@ -1095,6 +1097,7 @@ func TestState_Evaluator(t *testing.T) { }, "defaultVariant": "off", "source":"", + "selector":"", "targeting": { "if": [ { diff --git a/core/pkg/model/flag.go b/core/pkg/model/flag.go index c83066518..342b0488b 100644 --- a/core/pkg/model/flag.go +++ b/core/pkg/model/flag.go @@ -8,6 +8,7 @@ type Flag struct { Variants map[string]any `json:"variants"` Targeting json.RawMessage `json:"targeting,omitempty"` Source string `json:"source"` + Selector string `json:"selector"` } type Evaluators struct { diff --git a/core/pkg/store/flags.go b/core/pkg/store/flags.go index 5e2c973c4..10409ffd9 100644 --- a/core/pkg/store/flags.go +++ b/core/pkg/store/flags.go @@ -103,7 +103,8 @@ func (f *Flags) GetAll(_ context.Context) map[string]model.Flag { } // Add new flags from source. -func (f *Flags) Add(logger *logger.Logger, source string, flags map[string]model.Flag) map[string]interface{} { +func (f *Flags) Add(logger *logger.Logger, source string, selector string, flags map[string]model.Flag, +) map[string]interface{} { notifications := map[string]interface{}{} for k, newFlag := range flags { @@ -127,6 +128,7 @@ func (f *Flags) Add(logger *logger.Logger, source string, flags map[string]model // Store the new version of the flag newFlag.Source = source + newFlag.Selector = selector f.Set(k, newFlag) } @@ -134,7 +136,8 @@ func (f *Flags) Add(logger *logger.Logger, source string, flags map[string]model } // Update existing flags from source. -func (f *Flags) Update(logger *logger.Logger, source string, flags map[string]model.Flag) map[string]interface{} { +func (f *Flags) Update(logger *logger.Logger, source string, selector string, flags map[string]model.Flag, +) map[string]interface{} { notifications := map[string]interface{}{} for k, flag := range flags { @@ -165,6 +168,7 @@ func (f *Flags) Update(logger *logger.Logger, source string, flags map[string]mo } flag.Source = source + flag.Selector = selector f.Set(k, flag) } @@ -228,16 +232,18 @@ func (f *Flags) DeleteFlags(logger *logger.Logger, source string, flags map[stri } // Merge provided flags from source with currently stored flags. +// nolint: funlen func (f *Flags) Merge( logger *logger.Logger, source string, + selector string, flags map[string]model.Flag, ) (map[string]interface{}, bool) { notifications := map[string]interface{}{} resyncRequired := false f.mx.Lock() for k, v := range f.Flags { - if v.Source == source { + if v.Source == source && v.Selector == selector { if _, ok := flags[k]; !ok { // flag has been deleted delete(f.Flags, k) @@ -259,6 +265,7 @@ func (f *Flags) Merge( f.mx.Unlock() for k, newFlag := range flags { newFlag.Source = source + newFlag.Selector = selector storedFlag, ok := f.Get(context.Background(), k) if ok { if !f.hasPriority(storedFlag.Source, source) { diff --git a/core/pkg/store/flags_test.go b/core/pkg/store/flags_test.go index c89b2b643..c112ff84c 100644 --- a/core/pkg/store/flags_test.go +++ b/core/pkg/store/flags_test.go @@ -1,6 +1,7 @@ package store import ( + "reflect" "testing" "github.com/open-feature/flagd/core/pkg/logger" @@ -73,52 +74,38 @@ func TestHasPriority(t *testing.T) { func TestMergeFlags(t *testing.T) { t.Parallel() tests := []struct { - name string - current *Flags - new map[string]model.Flag - newSource string - want *Flags - wantNotifs map[string]interface{} - wantResync bool + name string + current *Flags + new map[string]model.Flag + newSource string + newSelector string + want *Flags + wantNotifs map[string]interface{} + wantResync bool }{ { - name: "both nil", - current: &Flags{ - Flags: nil, - }, + name: "both nil", + current: &Flags{Flags: nil}, new: nil, - want: &Flags{Flags: map[string]model.Flag{}}, - wantNotifs: map[string]interface{}{}, - }, - { - name: "both empty flags", - current: &Flags{ - Flags: map[string]model.Flag{}, - }, - new: map[string]model.Flag{}, - want: &Flags{Flags: map[string]model.Flag{}}, + want: &Flags{Flags: nil}, wantNotifs: map[string]interface{}{}, }, { - name: "empty current", - current: &Flags{ - Flags: nil, - }, + name: "both empty flags", + current: &Flags{Flags: map[string]model.Flag{}}, new: map[string]model.Flag{}, want: &Flags{Flags: map[string]model.Flag{}}, wantNotifs: map[string]interface{}{}, }, { - name: "empty new", - current: &Flags{ - Flags: map[string]model.Flag{}, - }, + name: "empty new", + current: &Flags{Flags: map[string]model.Flag{}}, new: nil, want: &Flags{Flags: map[string]model.Flag{}}, wantNotifs: map[string]interface{}{}, }, { - name: "extra fields on each", + name: "merging with new source", current: &Flags{ Flags: map[string]model.Flag{ "waka": { @@ -143,15 +130,14 @@ func TestMergeFlags(t *testing.T) { Source: "2", }, }}, - wantNotifs: map[string]interface{}{ - "paka": map[string]interface{}{"type": "write", "source": "2"}, - }, + wantNotifs: map[string]interface{}{"paka": map[string]interface{}{"type": "write", "source": "2"}}, }, { - name: "override", - current: &Flags{ - Flags: map[string]model.Flag{"waka": {DefaultVariant: "off"}}, - }, + name: "override by new update", + current: &Flags{Flags: map[string]model.Flag{ + "waka": {DefaultVariant: "off"}, + "paka": {DefaultVariant: "off"}, + }}, new: map[string]model.Flag{ "waka": {DefaultVariant: "on"}, "paka": {DefaultVariant: "on"}, @@ -162,11 +148,11 @@ func TestMergeFlags(t *testing.T) { }}, wantNotifs: map[string]interface{}{ "waka": map[string]interface{}{"type": "update", "source": ""}, - "paka": map[string]interface{}{"type": "write", "source": ""}, + "paka": map[string]interface{}{"type": "update", "source": ""}, }, }, { - name: "identical", + name: "identical update so empty notifications", current: &Flags{ Flags: map[string]model.Flag{"hello": {DefaultVariant: "off"}}, }, @@ -179,20 +165,26 @@ func TestMergeFlags(t *testing.T) { wantNotifs: map[string]interface{}{}, }, { - name: "deleted flag", - current: &Flags{ - Flags: map[string]model.Flag{"hello": {DefaultVariant: "off", Source: "A"}}, - }, - new: map[string]model.Flag{}, - newSource: "A", - want: &Flags{Flags: map[string]model.Flag{}}, - wantNotifs: map[string]interface{}{ - "hello": map[string]interface{}{"type": "delete", "source": "A"}, - }, + name: "deleted flag & trigger resync for same source", + current: &Flags{Flags: map[string]model.Flag{"hello": {DefaultVariant: "off", Source: "A"}}}, + new: map[string]model.Flag{}, + newSource: "A", + want: &Flags{Flags: map[string]model.Flag{}}, + wantNotifs: map[string]interface{}{"hello": map[string]interface{}{"type": "delete", "source": "A"}}, wantResync: true, }, { - name: "no merge priority", + name: "no deleted & no resync for same source but different selector", + current: &Flags{Flags: map[string]model.Flag{"hello": {DefaultVariant: "off", Source: "A", Selector: "X"}}}, + new: map[string]model.Flag{}, + newSource: "A", + newSelector: "Y", + want: &Flags{Flags: map[string]model.Flag{"hello": {DefaultVariant: "off", Source: "A", Selector: "X"}}}, + wantResync: false, + wantNotifs: map[string]interface{}{}, + }, + { + name: "no merge due to low priority", current: &Flags{ FlagSources: []string{ "B", @@ -205,9 +197,7 @@ func TestMergeFlags(t *testing.T) { }, }, }, - new: map[string]model.Flag{ - "hello": {DefaultVariant: "off"}, - }, + new: map[string]model.Flag{"hello": {DefaultVariant: "off"}}, newSource: "B", want: &Flags{ FlagSources: []string{ @@ -229,8 +219,9 @@ func TestMergeFlags(t *testing.T) { tt := tt t.Run(tt.name, func(t *testing.T) { t.Parallel() - gotNotifs, resyncRequired := tt.current.Merge(logger.NewLogger(nil, false), tt.newSource, tt.new) - require.Equal(t, tt.want, tt.want) + gotNotifs, resyncRequired := tt.current.Merge(logger.NewLogger(nil, false), tt.newSource, tt.newSelector, tt.new) + + require.True(t, reflect.DeepEqual(tt.want, tt.current)) require.Equal(t, tt.wantNotifs, gotNotifs) require.Equal(t, tt.wantResync, resyncRequired) }) @@ -243,8 +234,9 @@ func TestFlags_Add(t *testing.T) { mockOverrideSource := "source-2" type request struct { - source string - flags map[string]model.Flag + source string + selector string + flags map[string]model.Flag } tests := []struct { @@ -321,7 +313,7 @@ func TestFlags_Add(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - messages := tt.storedState.Add(mockLogger, tt.addRequest.source, tt.addRequest.flags) + messages := tt.storedState.Add(mockLogger, tt.addRequest.source, tt.addRequest.selector, tt.addRequest.flags) require.Equal(t, tt.storedState, tt.expectedState) @@ -339,8 +331,9 @@ func TestFlags_Update(t *testing.T) { mockOverrideSource := "source-2" type request struct { - source string - flags map[string]model.Flag + source string + selector string + flags map[string]model.Flag } tests := []struct { @@ -437,7 +430,8 @@ func TestFlags_Update(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - messages := tt.storedState.Update(mockLogger, tt.UpdateRequest.source, tt.UpdateRequest.flags) + messages := tt.storedState.Update(mockLogger, tt.UpdateRequest.source, + tt.UpdateRequest.selector, tt.UpdateRequest.flags) require.Equal(t, tt.storedState, tt.expectedState) diff --git a/core/pkg/sync/grpc/grpc_sync.go b/core/pkg/sync/grpc/grpc_sync.go index 0c2eb6f82..33263bf8f 100644 --- a/core/pkg/sync/grpc/grpc_sync.go +++ b/core/pkg/sync/grpc/grpc_sync.go @@ -76,7 +76,7 @@ func (g *Sync) Init(ctx context.Context) error { } func (g *Sync) ReSync(ctx context.Context, dataSync chan<- sync.DataSync) error { - res, err := g.client.FetchAllFlags(ctx, &v1.FetchAllFlagsRequest{}) + res, err := g.client.FetchAllFlags(ctx, &v1.FetchAllFlagsRequest{ProviderId: g.ProviderID, Selector: g.Selector}) if err != nil { err = fmt.Errorf("error fetching all flags: %w", err) g.Logger.Error(err.Error()) @@ -181,6 +181,7 @@ func (g *Sync) handleFlagSync(stream syncv1grpc.FlagSyncService_SyncFlagsClient, dataSync <- sync.DataSync{ FlagData: data.FlagConfiguration, Source: g.URI, + Selector: g.Selector, Type: sync.ALL, } diff --git a/core/pkg/sync/isync.go b/core/pkg/sync/isync.go index 7fba573f8..ce8a35635 100644 --- a/core/pkg/sync/isync.go +++ b/core/pkg/sync/isync.go @@ -57,6 +57,7 @@ type ISync interface { type DataSync struct { FlagData string Source string + Selector string Type }