diff --git a/README.md b/README.md index 5805b42..cf3fff5 100644 --- a/README.md +++ b/README.md @@ -10,8 +10,8 @@ events ```go e := events.New[string]() -e.Attach(sub1) -e.Attach(sub2) +e.Subscribe(sub1) +e.Subscribe(sub2) e.Publish(true, "触发事件1") // sub1 和 sub2 均会收事事件 ``` diff --git a/events.go b/events.go index 4385286..3ec02ff 100644 --- a/events.go +++ b/events.go @@ -7,12 +7,12 @@ // e := events.New[string]() // // // 订阅事件 -// e.Attach(func(data string){ +// e.Subscribe(func(data string){ // fmt.Println("subscriber 1:", data) // }) // // // 订阅事件 -// e.Attach(func(data string){ +// e.Subscribe(func(data string){ // fmt.Println("subscriber 2:", data) // }) // @@ -20,24 +20,19 @@ package events import ( - "errors" + "context" + "reflect" "sync" ) -var errStopped = errors.New("events: stopped") - // SubscribeFunc 订阅者函数 // -// 每个订阅函数都是通过 go 异步执行。 -// // data 为事件传递过来的数据,可能存在多个订阅者, // 用户不应该直接修改 data 数据,否则结果是未知的。 type SubscribeFunc[T any] func(data T) type event[T any] struct { - locker sync.RWMutex - count int - funcs map[int]SubscribeFunc[T] + funcs *sync.Map } // Publisher 事件的发布者 @@ -46,21 +41,15 @@ type Publisher[T any] interface { // // sync 表示订阅者是否以异步的方式执行; // data 传递给订阅者的数据; - Publish(sync bool, data T) error - - // Destroy 销毁当前事件处理程序 - Destroy() + Publish(sync bool, data T) } // Subscriber 供用户订阅事件的对象接口 type Subscriber[T any] interface { - // Attach 注册订阅者 + // Subscribe 注册订阅事件 // - // 返回唯一 ID,用户可以使用此 ID 取消订阅。 - Attach(SubscribeFunc[T]) (int, error) - - // Detach 取消指定事件的订阅 - Detach(int) + // 返回用于注销此订阅事件的方法。 + Subscribe(SubscribeFunc[T]) (context.CancelFunc, error) } type Eventer[T any] interface { @@ -68,70 +57,32 @@ type Eventer[T any] interface { Subscriber[T] } -// errStopped 表示发布都已经调用 [Publisher.Destroy] 销毁了事件处理器 -func ErrStopped() error { return errStopped } - // New 声明一个新的事件处理 // // T 为事件传递过程的参数类型; func New[T any]() Eventer[T] { return &event[T]{ - funcs: make(map[int]SubscribeFunc[T], 5), + funcs: &sync.Map{}, } } -func (e *event[T]) Publish(sync bool, data T) error { - // 初如化时将 e.funcs 设置为了非 nil 状态, - // 所以为 nil 表示已经调用 [Publisher.Destroy] - if e.funcs == nil { - return ErrStopped() - } - - e.locker.RLock() - defer e.locker.RUnlock() - - if len(e.funcs) == 0 { - return nil - } - +func (e *event[T]) Publish(sync bool, data T) { if sync { - for _, s := range e.funcs { - go func(sub SubscribeFunc[T]) { - sub(data) - }(s) - } + e.funcs.Range(func(key, value any) bool { + go func(sub SubscribeFunc[T]) { sub(data) }(value.(SubscribeFunc[T])) + return true + }) } else { - for _, s := range e.funcs { - s(data) - } + e.funcs.Range(func(key, value any) bool { + value.(SubscribeFunc[T])(data) + return true + }) } - - return nil } -func (e *event[T]) Destroy() { - e.locker.Lock() - e.funcs = nil - e.locker.Unlock() -} - -func (e *event[T]) Attach(subscriber SubscribeFunc[T]) (int, error) { - if e.funcs == nil { - return 0, ErrStopped() - } - - ret := e.count - - e.locker.Lock() - e.count++ - e.funcs[ret] = subscriber - e.locker.Unlock() - - return ret, nil -} +func (e *event[T]) Subscribe(subscriber SubscribeFunc[T]) (context.CancelFunc, error) { + ptr := reflect.ValueOf(subscriber).Pointer() + e.funcs.Store(ptr, subscriber) -func (e *event[T]) Detach(id int) { - e.locker.Lock() - delete(e.funcs, id) - e.locker.Unlock() + return func() { e.funcs.Delete(ptr) }, nil } diff --git a/events_test.go b/events_test.go index 9068c5e..d2f88c9 100644 --- a/events_test.go +++ b/events_test.go @@ -28,43 +28,38 @@ func TestPublisher_Publish(t *testing.T) { a.NotNil(e) // 没有订阅者 - a.NotError(e.Publish(true, "123")) + e.Publish(true, "123") buf1 := new(bytes.Buffer) sub1 := func(data string) { buf1.WriteString(data) } - buf2 := new(bytes.Buffer) - sub2 := func(data string) { - buf2.WriteString(data) - } - - id1, err := e.Attach(sub1) - a.NotError(err) + c1, err := e.Subscribe(sub1) + a.NotError(err).NotNil(c1) e.Publish(true, "p1") time.Sleep(time.Microsecond * 500) a.Equal(buf1.String(), "p1") - a.Empty(buf2.Bytes()) buf1.Reset() - buf2.Reset() - e.Attach(sub2) - a.NotError(e.Publish(false, "p2")) + buf2 := new(bytes.Buffer) + sub2 := func(data string) { + buf2.WriteString(data) + } + a.Empty(buf2.Bytes()) + e.Subscribe(sub2) + e.Publish(false, "p2") time.Sleep(time.Microsecond * 500) a.Equal(buf1.String(), "p2") a.Equal(buf2.String(), "p2") buf1.Reset() buf2.Reset() - e.Detach(id1) - a.NotError(e.Publish(false, "p3")) + c1() + e.Publish(false, "p3") time.Sleep(time.Microsecond * 500) a.Empty(buf1.String()) a.Equal(buf2.String(), "p3") - - e.Destroy() - a.Error(e.Publish(false, "p4")) } func TestPublisher_Destroy(t *testing.T) { @@ -72,16 +67,14 @@ func TestPublisher_Destroy(t *testing.T) { e := New[string]() a.NotNil(e) - e.Destroy() ee, ok := e.(*(event[string])) - a.True(ok).NotNil(ee).Nil(ee.funcs) + a.True(ok).NotNil(ee).Zero(ee.len()) e = New[string]() a.NotNil(e) - e.Attach(s1) - e.Destroy() + e.Subscribe(s1) ee, ok = e.(*(event[string])) - a.True(ok).NotNil(ee).Nil(ee.funcs) + a.True(ok).NotNil(ee).Equal(ee.len(), 1) } func TestSubscriber_Attach_Detach(t *testing.T) { @@ -89,23 +82,26 @@ func TestSubscriber_Attach_Detach(t *testing.T) { e := New[string]() a.NotNil(e) - id1, err := e.Attach(s1) + c1, err := e.Subscribe(s1) a.NotError(err) - id2, err := e.Attach(s2) + c2, err := e.Subscribe(s2) a.NotError(err) ee, ok := e.(*(event[string])) a.True(ok).NotNil(ee) - a.Equal(len(ee.funcs), 2) + a.Equal(ee.len(), 2) - e.Detach(id1) - a.Equal(len(ee.funcs), 1) + c1() + a.Equal(ee.len(), 1) - e.Detach(id2) - a.Equal(len(ee.funcs), 0) - - // Destroy + c2() + a.Equal(ee.len(), 0) +} - e.Destroy() - e.Attach(s1) +func (e *event[T]) len() (c int) { + e.funcs.Range(func(key, value any) bool { + c++ + return true + }) + return }