Skip to content

Commit

Permalink
refactor: 重构代码
Browse files Browse the repository at this point in the history
作了以下修改:
  - Subscriber 接口将两个方法并为一个,从订阅函数返回删除的方法;
  - 删除了 Publisher.Destroy;
  - 采用 sync.Map 代替原有的 map 实现;
  - 删除不再需要的  ErrStopped 错误;
  • Loading branch information
caixw committed Apr 18, 2024
1 parent 7e3bb94 commit 9991d89
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 107 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ events
```go
e := events.New[string]()

e.Attach(sub1)
e.Attach(sub2)
e.Subscribe(sub1)
e.Subscribe(sub2)

e.Publish(true, "触发事件1") // sub1 和 sub2 均会收事事件
```
Expand Down
95 changes: 23 additions & 72 deletions events.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,37 +7,32 @@
// e := events.New[string]()
//
// // 订阅事件
// e.Attach(func(data string){
// e.Subscribe(func(data string){
// fmt.Println("subscriber 1:", data)
// })
//
// // 订阅事件
// e.Attach(func(data string){
// e.Subscribe(func(data string){
// fmt.Println("subscriber 2:", data)
// })
//
// e.Publish(true, "test") // 发布事件
package events

import (
"errors"
"context"
"reflect"
"sync"
)

var errStopped = errors.New("events: stopped")

// SubscribeFunc 订阅者函数
//
// 每个订阅函数都是通过 go 异步执行。
//
// data 为事件传递过来的数据,可能存在多个订阅者,
// 用户不应该直接修改 data 数据,否则结果是未知的。
type SubscribeFunc[T any] func(data T)

type event[T any] struct {
locker sync.RWMutex
count int
funcs map[int]SubscribeFunc[T]
funcs *sync.Map
}

// Publisher 事件的发布者
Expand All @@ -46,92 +41,48 @@ type Publisher[T any] interface {
//
// sync 表示订阅者是否以异步的方式执行;
// data 传递给订阅者的数据;
Publish(sync bool, data T) error

// Destroy 销毁当前事件处理程序
Destroy()
Publish(sync bool, data T)
}

// Subscriber 供用户订阅事件的对象接口
type Subscriber[T any] interface {
// Attach 注册订阅者
// Subscribe 注册订阅事件
//
// 返回唯一 ID,用户可以使用此 ID 取消订阅。
Attach(SubscribeFunc[T]) (int, error)

// Detach 取消指定事件的订阅
Detach(int)
// 返回用于注销此订阅事件的方法。
Subscribe(SubscribeFunc[T]) (context.CancelFunc, error)
}

type Eventer[T any] interface {
Publisher[T]
Subscriber[T]
}

// errStopped 表示发布都已经调用 [Publisher.Destroy] 销毁了事件处理器
func ErrStopped() error { return errStopped }

// New 声明一个新的事件处理
//
// T 为事件传递过程的参数类型;
func New[T any]() Eventer[T] {
return &event[T]{
funcs: make(map[int]SubscribeFunc[T], 5),
funcs: &sync.Map{},
}
}

func (e *event[T]) Publish(sync bool, data T) error {
// 初如化时将 e.funcs 设置为了非 nil 状态,
// 所以为 nil 表示已经调用 [Publisher.Destroy]
if e.funcs == nil {
return ErrStopped()
}

e.locker.RLock()
defer e.locker.RUnlock()

if len(e.funcs) == 0 {
return nil
}

func (e *event[T]) Publish(sync bool, data T) {
if sync {
for _, s := range e.funcs {
go func(sub SubscribeFunc[T]) {
sub(data)
}(s)
}
e.funcs.Range(func(key, value any) bool {
go func(sub SubscribeFunc[T]) { sub(data) }(value.(SubscribeFunc[T]))
return true
})
} else {
for _, s := range e.funcs {
s(data)
}
e.funcs.Range(func(key, value any) bool {
value.(SubscribeFunc[T])(data)
return true
})
}

return nil
}

func (e *event[T]) Destroy() {
e.locker.Lock()
e.funcs = nil
e.locker.Unlock()
}

func (e *event[T]) Attach(subscriber SubscribeFunc[T]) (int, error) {
if e.funcs == nil {
return 0, ErrStopped()
}

ret := e.count

e.locker.Lock()
e.count++
e.funcs[ret] = subscriber
e.locker.Unlock()

return ret, nil
}
func (e *event[T]) Subscribe(subscriber SubscribeFunc[T]) (context.CancelFunc, error) {
ptr := reflect.ValueOf(subscriber).Pointer()
e.funcs.Store(ptr, subscriber)

func (e *event[T]) Detach(id int) {
e.locker.Lock()
delete(e.funcs, id)
e.locker.Unlock()
return func() { e.funcs.Delete(ptr) }, nil
}
62 changes: 29 additions & 33 deletions events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,84 +28,80 @@ func TestPublisher_Publish(t *testing.T) {
a.NotNil(e)

// 没有订阅者
a.NotError(e.Publish(true, "123"))
e.Publish(true, "123")

buf1 := new(bytes.Buffer)
sub1 := func(data string) {
buf1.WriteString(data)
}

buf2 := new(bytes.Buffer)
sub2 := func(data string) {
buf2.WriteString(data)
}

id1, err := e.Attach(sub1)
a.NotError(err)
c1, err := e.Subscribe(sub1)
a.NotError(err).NotNil(c1)
e.Publish(true, "p1")
time.Sleep(time.Microsecond * 500)
a.Equal(buf1.String(), "p1")
a.Empty(buf2.Bytes())

buf1.Reset()
buf2.Reset()
e.Attach(sub2)
a.NotError(e.Publish(false, "p2"))
buf2 := new(bytes.Buffer)
sub2 := func(data string) {
buf2.WriteString(data)
}
a.Empty(buf2.Bytes())
e.Subscribe(sub2)
e.Publish(false, "p2")
time.Sleep(time.Microsecond * 500)
a.Equal(buf1.String(), "p2")
a.Equal(buf2.String(), "p2")

buf1.Reset()
buf2.Reset()
e.Detach(id1)
a.NotError(e.Publish(false, "p3"))
c1()
e.Publish(false, "p3")
time.Sleep(time.Microsecond * 500)
a.Empty(buf1.String())
a.Equal(buf2.String(), "p3")

e.Destroy()
a.Error(e.Publish(false, "p4"))
}

func TestPublisher_Destroy(t *testing.T) {
a := assert.New(t, false)

e := New[string]()
a.NotNil(e)
e.Destroy()
ee, ok := e.(*(event[string]))
a.True(ok).NotNil(ee).Nil(ee.funcs)
a.True(ok).NotNil(ee).Zero(ee.len())

e = New[string]()
a.NotNil(e)
e.Attach(s1)
e.Destroy()
e.Subscribe(s1)
ee, ok = e.(*(event[string]))
a.True(ok).NotNil(ee).Nil(ee.funcs)
a.True(ok).NotNil(ee).Equal(ee.len(), 1)
}

func TestSubscriber_Attach_Detach(t *testing.T) {
a := assert.New(t, false)
e := New[string]()
a.NotNil(e)

id1, err := e.Attach(s1)
c1, err := e.Subscribe(s1)
a.NotError(err)
id2, err := e.Attach(s2)
c2, err := e.Subscribe(s2)
a.NotError(err)
ee, ok := e.(*(event[string]))
a.True(ok).NotNil(ee)

a.Equal(len(ee.funcs), 2)
a.Equal(ee.len(), 2)

e.Detach(id1)
a.Equal(len(ee.funcs), 1)
c1()
a.Equal(ee.len(), 1)

e.Detach(id2)
a.Equal(len(ee.funcs), 0)

// Destroy
c2()
a.Equal(ee.len(), 0)
}

e.Destroy()
e.Attach(s1)
func (e *event[T]) len() (c int) {
e.funcs.Range(func(key, value any) bool {
c++
return true
})
return
}

0 comments on commit 9991d89

Please sign in to comment.