Skip to content

Commit

Permalink
feat: support UpdatableWatcher interface (#35)
Browse files Browse the repository at this point in the history
  • Loading branch information
Rainshaw authored Sep 7, 2022
1 parent 06fcda7 commit 69ad23b
Show file tree
Hide file tree
Showing 4 changed files with 138 additions and 48 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/casbin/redis-watcher/v2
go 1.18

require (
github.com/casbin/casbin/v2 v2.53.1
github.com/casbin/casbin/v2 v2.54.0
github.com/go-redis/redis/v8 v8.11.5
github.com/google/uuid v1.3.0
)
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible h1:1G1pk05UrOh0NlF1oeaaix1x8XzrfjIDK47TY0Zehcw=
github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0=
github.com/casbin/casbin/v2 v2.53.1 h1:uD/1LMHEPOkn1Xw5UmLnOJxdBPI7Zz85VbdPLJhivxo=
github.com/casbin/casbin/v2 v2.53.1/go.mod h1:vByNa/Fchek0KZUgG5wEsl7iFsiviAYKRtgrQfcJqHg=
github.com/casbin/casbin/v2 v2.54.0 h1:NFQ3Xkw6rfbD/rwEHMVRHVP5gUxhNQKdcZCb53pwSrA=
github.com/casbin/casbin/v2 v2.54.0/go.mod h1:vByNa/Fchek0KZUgG5wEsl7iFsiviAYKRtgrQfcJqHg=
github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE=
github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
Expand Down
102 changes: 76 additions & 26 deletions watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,19 @@ func DefaultUpdateCallback(e casbin.IEnforcer) func(string) {
err = e.LoadPolicy()
res = true
case UpdateForAddPolicy:
res, err = e.SelfAddPolicy(msgStruct.Sec, msgStruct.Ptype, msgStruct.Rule)
res, err = e.SelfAddPolicy(msgStruct.Sec, msgStruct.Ptype, msgStruct.NewRule)
case UpdateForAddPolicies:
res, err = e.SelfAddPolicies(msgStruct.Sec, msgStruct.Ptype, msgStruct.Rules)
res, err = e.SelfAddPolicies(msgStruct.Sec, msgStruct.Ptype, msgStruct.NewRules)
case UpdateForRemovePolicy:
res, err = e.SelfRemovePolicy(msgStruct.Sec, msgStruct.Ptype, msgStruct.Rule)
res, err = e.SelfRemovePolicy(msgStruct.Sec, msgStruct.Ptype, msgStruct.NewRule)
case UpdateForRemoveFilteredPolicy:
res, err = e.SelfRemoveFilteredPolicy(msgStruct.Sec, msgStruct.Ptype, msgStruct.FieldIndex, msgStruct.FieldValues...)
case UpdateForRemovePolicies:
res, err = e.SelfRemovePolicies(msgStruct.Sec, msgStruct.Ptype, msgStruct.Rules)
res, err = e.SelfRemovePolicies(msgStruct.Sec, msgStruct.Ptype, msgStruct.NewRules)
case UpdateForUpdatePolicy:
res, err = e.SelfUpdatePolicy(msgStruct.Sec, msgStruct.Ptype, msgStruct.OldRule, msgStruct.NewRule)
case UpdateForUpdatePolicies:
res, err = e.SelfUpdatePolicies(msgStruct.Sec, msgStruct.Ptype, msgStruct.OldRules, msgStruct.NewRules)
default:
err = errors.New("unknown update type")
}
Expand All @@ -67,8 +71,10 @@ type MSG struct {
ID string
Sec string
Ptype string
Rule []string
Rules [][]string
OldRule []string
OldRules [][]string
NewRule []string
NewRules [][]string
FieldIndex int
FieldValues []string
}
Expand All @@ -83,6 +89,8 @@ const (
UpdateForSavePolicy UpdateType = "UpdateForSavePolicy"
UpdateForAddPolicies UpdateType = "UpdateForAddPolicies"
UpdateForRemovePolicies UpdateType = "UpdateForRemovePolicies"
UpdateForUpdatePolicy UpdateType = "UpdateForUpdatePolicy"
UpdateForUpdatePolicies UpdateType = "UpdateForUpdatePolicies"
)

func (m *MSG) MarshalBinary() ([]byte, error) {
Expand Down Expand Up @@ -262,11 +270,11 @@ func (w *Watcher) UpdateForAddPolicy(sec, ptype string, params ...string) error
context.Background(),
w.options.Channel,
&MSG{
Method: UpdateForAddPolicy,
ID: w.options.LocalID,
Sec: sec,
Ptype: ptype,
Rule: params,
Method: UpdateForAddPolicy,
ID: w.options.LocalID,
Sec: sec,
Ptype: ptype,
NewRule: params,
}).Err()
})
}
Expand All @@ -281,11 +289,11 @@ func (w *Watcher) UpdateForRemovePolicy(sec, ptype string, params ...string) err
context.Background(),
w.options.Channel,
&MSG{
Method: UpdateForRemovePolicy,
ID: w.options.LocalID,
Sec: sec,
Ptype: ptype,
Rule: params,
Method: UpdateForRemovePolicy,
ID: w.options.LocalID,
Sec: sec,
Ptype: ptype,
NewRule: params,
},
).Err()
})
Expand Down Expand Up @@ -339,11 +347,11 @@ func (w *Watcher) UpdateForAddPolicies(sec string, ptype string, rules ...[]stri
context.Background(),
w.options.Channel,
&MSG{
Method: UpdateForAddPolicies,
ID: w.options.LocalID,
Sec: sec,
Ptype: ptype,
Rules: rules,
Method: UpdateForAddPolicies,
ID: w.options.LocalID,
Sec: sec,
Ptype: ptype,
NewRules: rules,
},
).Err()
})
Expand All @@ -359,11 +367,53 @@ func (w *Watcher) UpdateForRemovePolicies(sec string, ptype string, rules ...[]s
context.Background(),
w.options.Channel,
&MSG{
Method: UpdateForRemovePolicies,
ID: w.options.LocalID,
Sec: sec,
Ptype: ptype,
Rules: rules,
Method: UpdateForRemovePolicies,
ID: w.options.LocalID,
Sec: sec,
Ptype: ptype,
NewRules: rules,
},
).Err()
})
}

// UpdateForUpdatePolicy calls the update callback of other instances to synchronize their policy.
// It is called after Enforcer.UpdatePolicy()
func (w *Watcher) UpdateForUpdatePolicy(sec string, ptype string, oldRule, newRule []string) error {
return w.logRecord(func() error {
w.l.Lock()
defer w.l.Unlock()
return w.pubClient.Publish(
context.Background(),
w.options.Channel,
&MSG{
Method: UpdateForUpdatePolicy,
ID: w.options.LocalID,
Sec: sec,
Ptype: ptype,
OldRule: oldRule,
NewRule: newRule,
},
).Err()
})
}

// UpdateForUpdatePolicies calls the update callback of other instances to synchronize their policy.
// It is called after Enforcer.UpdatePolicies()
func (w *Watcher) UpdateForUpdatePolicies(sec string, ptype string, oldRules, newRules [][]string) error {
return w.logRecord(func() error {
w.l.Lock()
defer w.l.Unlock()
return w.pubClient.Publish(
context.Background(),
w.options.Channel,
&MSG{
Method: UpdateForUpdatePolicies,
ID: w.options.LocalID,
Sec: sec,
Ptype: ptype,
OldRules: oldRules,
NewRules: newRules,
},
).Err()
})
Expand Down
78 changes: 59 additions & 19 deletions watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,27 +89,21 @@ func TestWatcherWithIgnoreSelfTrue(t *testing.T) {
}

func TestUpdate(t *testing.T) {
wo := rediswatcher.WatcherOptions{
IgnoreSelf: true,
}
e, w := initWatcherWithOptions(t, wo)
e2, w2 := initWatcherWithOptions(t, wo)

time.Sleep(time.Millisecond * 500)
_, _ = e.UpdatePolicy([]string{"alice", "data1", "read"}, []string{"alice", "data1", "write"})
_ = e.LoadPolicy()

time.Sleep(time.Millisecond * 500)
if !reflect.DeepEqual(e2.GetPolicy(), e.GetPolicy()) {
t.Log("Method", "Update")
t.Log("e.policy", e.GetPolicy())
t.Log("e2.policy", e2.GetPolicy())
t.Error("These two enforcers' policies should be equal")
}
_, w := initWatcher(t)
_ = w.SetUpdateCallback(func(s string) {
msgStruct := &rediswatcher.MSG{}

err := msgStruct.UnmarshalBinary([]byte(s))
if err != nil {
t.Error(err)
return
}
if msgStruct.Method != "Update" {
t.Errorf("Method should be Update instead of %s", msgStruct.Method)
}
})
_ = w.Update()
w.Close()
w2.Close()
time.Sleep(time.Millisecond * 500)
time.Sleep(time.Millisecond * 500)
}

Expand Down Expand Up @@ -258,3 +252,49 @@ func TestUpdateForRemovePolicies(t *testing.T) {
w2.Close()
time.Sleep(time.Millisecond * 500)
}

func TestUpdateForUpdatePolicy(t *testing.T) {
wo := rediswatcher.WatcherOptions{
IgnoreSelf: true,
}
e, w := initWatcherWithOptions(t, wo)
e2, w2 := initWatcherWithOptions(t, wo)

time.Sleep(time.Millisecond * 500)
_, _ = e.UpdatePolicy([]string{"alice", "data1", "read"}, []string{"alice", "data1", "write"})

time.Sleep(time.Millisecond * 500)
if !reflect.DeepEqual(e2.GetPolicy(), e.GetPolicy()) {
t.Log("Method", "UpdatePolicy")
t.Log("e.policy", e.GetPolicy())
t.Log("e2.policy", e2.GetPolicy())
t.Error("These two enforcers' policies should be equal")
}

w.Close()
w2.Close()
time.Sleep(time.Millisecond * 500)
}

func TestUpdateForUpdatePolicies(t *testing.T) {
wo := rediswatcher.WatcherOptions{
IgnoreSelf: true,
}
e, w := initWatcherWithOptions(t, wo)
e2, w2 := initWatcherWithOptions(t, wo)

time.Sleep(time.Millisecond * 500)
_, _ = e.UpdatePolicies([][]string{{"alice", "data1", "read"}}, [][]string{{"alice", "data1", "write"}})

time.Sleep(time.Millisecond * 500)
if !reflect.DeepEqual(e2.GetPolicy(), e.GetPolicy()) {
t.Log("Method", "UpdatePolicies")
t.Log("e.policy", e.GetPolicy())
t.Log("e2.policy", e2.GetPolicy())
t.Error("These two enforcers' policies should be equal")
}

w.Close()
w2.Close()
time.Sleep(time.Millisecond * 500)
}

0 comments on commit 69ad23b

Please sign in to comment.