Skip to content

Commit

Permalink
DomainEvent Retry.
Browse files Browse the repository at this point in the history
  • Loading branch information
8treenet committed Apr 24, 2021
1 parent b8bc556 commit 0602c27
Show file tree
Hide file tree
Showing 11 changed files with 81 additions and 80 deletions.
32 changes: 18 additions & 14 deletions example/fshop/infra/domainevent/event_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ func (manager *EventManager) Booting(bootManager freedom.BootManager) {
}

//重试参考 example/infra-example
//manager.eventRetry.Booting() //启动重试
}

// GetPublisherChan .
Expand All @@ -58,13 +57,13 @@ func (manager *EventManager) GetPublisherChan() <-chan freedom.DomainEvent {
}

// SetRetryPolicy Set the rules for retry.
func (manager *EventManager) SetRetryPolicy(delay, interval time.Duration, retries int) {
manager.eventRetry.SetRetryPolicy(delay, interval, retries)
func (manager *EventManager) SetRetryPolicy(delay time.Duration, retries int) {
manager.eventRetry.setRetryPolicy(delay, retries)
}

// push EventTransaction在事务成功后触发 .
func (manager *EventManager) push(event freedom.DomainEvent) {
freedom.Logger().Infof("Publish Event Topic:%s, %+v", event.Topic(), event)
freedom.Logger().Debugf("Publish Event Topic:%s, %+v", event.Topic(), event)
identity := event.Identity()
go func() {
defer func() {
Expand All @@ -86,7 +85,7 @@ func (manager *EventManager) push(event freedom.DomainEvent) {
*/
manager.publisherChan <- event

if !manager.eventRetry.PubExist(event.Topic()) {
if !manager.eventRetry.pubExist(event.Topic()) {
return //未注册重试,结束
}

Expand Down Expand Up @@ -115,7 +114,7 @@ func (manager *EventManager) Save(repo *freedom.Repository, entity freedom.Entit

//Insert PubEvent
for _, domainEvent := range entity.GetPubEvent() {
if !manager.eventRetry.PubExist(domainEvent.Topic()) {
if !manager.eventRetry.pubExist(domainEvent.Topic()) {
continue //未注册重试,无需存储
}

Expand All @@ -139,7 +138,7 @@ func (manager *EventManager) Save(repo *freedom.Repository, entity freedom.Entit

//Delete SubEvent
for _, subEvent := range entity.GetSubEvent() {
if !manager.eventRetry.SubExist(subEvent.Topic()) {
if !manager.eventRetry.subExist(subEvent.Topic()) {
continue //未注册重试,无需修改
}

Expand All @@ -153,7 +152,7 @@ func (manager *EventManager) Save(repo *freedom.Repository, entity freedom.Entit

// InsertSubEvent .
func (manager *EventManager) InsertSubEvent(event freedom.DomainEvent) error {
if !manager.eventRetry.SubExist(event.Topic()) {
if !manager.eventRetry.subExist(event.Topic()) {
return nil //未注册重试,无需存储
}

Expand All @@ -172,14 +171,19 @@ func (manager *EventManager) InsertSubEvent(event freedom.DomainEvent) error {
return manager.db().Create(&model).Error //插入消费事件表。
}

// RetryPubEvent .
func (manager *EventManager) RetryPubEvent(event freedom.DomainEvent) {
manager.eventRetry.RetryPubEvent(event)
// Retry .
func (manager *EventManager) Retry() {
manager.eventRetry.retry()
}

// RetrySubEvent .
func (manager *EventManager) RetrySubEvent(event freedom.DomainEvent, function interface{}) {
manager.eventRetry.RetrySubEvent(event, function)
// BindRetryPubEvent .
func (manager *EventManager) BindRetryPubEvent(event freedom.DomainEvent) {
manager.eventRetry.bindRetryPubEvent(event)
}

// BindRetrySubEvent .
func (manager *EventManager) BindRetrySubEvent(event freedom.DomainEvent, function interface{}) {
manager.eventRetry.bindRetrySubEvent(event, function)
}

// addPubToWorker 增加发布事件到Worker.Store.
Expand Down
38 changes: 15 additions & 23 deletions example/fshop/infra/domainevent/event_retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type subRetry struct {
function interface{}
}

func (retry *eventRetry) RetryPubEvent(event freedom.DomainEvent) {
func (retry *eventRetry) bindRetryPubEvent(event freedom.DomainEvent) {
topic := event.Topic()
if topic == "" {
panic("Topic Cannot be empty")
Expand All @@ -43,7 +43,7 @@ func (retry *eventRetry) RetryPubEvent(event freedom.DomainEvent) {
retry.pubPool[topic] = reflect.TypeOf(event)
}

func (retry *eventRetry) RetrySubEvent(event freedom.DomainEvent, fun interface{}) {
func (retry *eventRetry) bindRetrySubEvent(event freedom.DomainEvent, fun interface{}) {
topic := event.Topic()
if topic == "" {
panic("Topic Cannot be empty")
Expand All @@ -67,32 +67,24 @@ func (retry *eventRetry) RetrySubEvent(event freedom.DomainEvent, fun interface{
}
}

func (retry *eventRetry) PubExist(topic string) bool {
func (retry *eventRetry) pubExist(topic string) bool {
_, ok := retry.pubPool[topic]
return ok
}

func (retry *eventRetry) SubExist(topic string) bool {
func (retry *eventRetry) subExist(topic string) bool {
_, ok := retry.subPool[topic]
return ok
}

func (retry *eventRetry) SetRetryPolicy(delay, interval time.Duration, retries int) {
func (retry *eventRetry) setRetryPolicy(delay time.Duration, retries int) {
retry.delay = delay
retry.retries = retries
retry.interval = interval
}

func (retry *eventRetry) Booting() {
go func() {
time.Sleep(5 * time.Second)
timer := time.NewTimer(retry.interval)
for range timer.C {
retry.scanSub()
retry.scanPub()
timer.Reset(retry.interval)
}
}()
func (retry *eventRetry) retry() {
retry.scanSub()
retry.scanPub()
}

func (retry *eventRetry) scanSub() {
Expand All @@ -106,7 +98,7 @@ func (retry *eventRetry) scanSub() {

var filterList []*subEventObject
for _, po := range list {
if !retry.SubExist(po.Topic) {
if !retry.subExist(po.Topic) {
GetEventManager().db().Delete(&subEventObject{}, "identity = ?", po.Identity) //未注册重试,直接删除
continue
}
Expand Down Expand Up @@ -150,13 +142,13 @@ func (retry *eventRetry) scanPub() {
var list []*pubEventObject
err := GetEventManager().db().Where("retries < ? and created < ?", retry.retries, time.Now().Add(-retry.delay)).Order("sequence ASC").Limit(rows).Find(&list).Error
if err != nil {
freedom.Logger().Info("RetryPubEvent:", err)
freedom.Logger().Error("RetryPubEvent:", err)
return
}

var filterList []*pubEventObject
for _, po := range list {
if !retry.PubExist(po.Topic) {
if !retry.pubExist(po.Topic) {
GetEventManager().db().Delete(&pubEventObject{}, "identity = ?", po.Identity) //未注册重试,直接删除
continue
}
Expand Down Expand Up @@ -196,20 +188,20 @@ func (retry *eventRetry) callPub(po *pubEventObject) {
func parseRetryCallFunc(f interface{}) (inType reflect.Type, e error) {
ftype := reflect.TypeOf(f)
if ftype.Kind() != reflect.Func {
e = errors.New("It's not a func")
e = errors.New("it's not a func")
return
}
if ftype.NumIn() != 1 {
e = errors.New("The function's argument is wrong")
e = errors.New("the function's argument is wrong")
return
}
if ftype.NumOut() != 0 {
e = errors.New("The function's argument is wrong")
e = errors.New("the function's argument is wrong")
return
}
inType = ftype.In(0)
if inType.Kind() != reflect.Ptr {
e = errors.New("The function's argument is wrong")
e = errors.New("the function's argument is wrong")
return
}
return
Expand Down
1 change: 0 additions & 1 deletion example/fshop/infra/domainevent/event_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,5 +65,4 @@ func (et *EventTransaction) pushEvent() {
for _, pubEvent := range pubEvents {
eventManager.push(pubEvent) //使用manager推送
}
return
}
2 changes: 1 addition & 1 deletion example/infra-example/adapter/controller/goods.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func init() {
})

//重试消费事件
domainevent.GetEventManager().RetrySubEvent(&event.ShopGoods{}, func(shopGoodsEvent *event.ShopGoods) {
domainevent.GetEventManager().BindRetrySubEvent(&event.ShopGoods{}, func(shopGoodsEvent *event.ShopGoods) {
freedom.ServiceLocator().Call(func(goodsSev *domain.GoodsService) {
goodsSev.ShopEvent(shopGoodsEvent)
})
Expand Down
2 changes: 1 addition & 1 deletion example/infra-example/domain/event/shop_goods.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (

func init() {
//不注册 不会触发重试
domainevent.GetEventManager().RetryPubEvent(&ShopGoods{})
domainevent.GetEventManager().BindRetryPubEvent(&ShopGoods{})
}

// ShopGoods 购买事件
Expand Down
33 changes: 18 additions & 15 deletions example/infra-example/infra/domainevent/event_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,18 +47,16 @@ func (manager *EventManager) Booting(bootManager freedom.BootManager) {
if !manager.FetchSingleInfra(&manager.kafkaProducer) {
panic("No KafkaProducer found")
}

manager.eventRetry.Booting() //启动重试
}

// SetRetryPolicy Set the rules for retry.
func (manager *EventManager) SetRetryPolicy(delay, interval time.Duration, retries int) {
manager.eventRetry.SetRetryPolicy(delay, interval, retries)
func (manager *EventManager) SetRetryPolicy(delay time.Duration, retries int) {
manager.eventRetry.setRetryPolicy(delay, retries)
}

// push EventTransaction在事务成功后触发 .
func (manager *EventManager) push(event freedom.DomainEvent) {
freedom.Logger().Infof("Publish Event Topic:%s, %+v", event.Topic(), event)
freedom.Logger().Debugf("Publish Event Topic:%s, %+v", event.Topic(), event)
identity := event.Identity()
go func() {
defer func() {
Expand All @@ -81,7 +79,7 @@ func (manager *EventManager) push(event freedom.DomainEvent) {
return
}

if !manager.eventRetry.PubExist(event.Topic()) {
if !manager.eventRetry.pubExist(event.Topic()) {
return //未注册重试,结束
}

Expand Down Expand Up @@ -110,7 +108,7 @@ func (manager *EventManager) Save(repo *freedom.Repository, entity freedom.Entit

//Insert PubEvent
for _, domainEvent := range entity.GetPubEvent() {
if !manager.eventRetry.PubExist(domainEvent.Topic()) {
if !manager.eventRetry.pubExist(domainEvent.Topic()) {
continue //未注册重试,无需存储
}

Expand All @@ -134,7 +132,7 @@ func (manager *EventManager) Save(repo *freedom.Repository, entity freedom.Entit

//Delete SubEvent
for _, subEvent := range entity.GetSubEvent() {
if !manager.eventRetry.SubExist(subEvent.Topic()) {
if !manager.eventRetry.subExist(subEvent.Topic()) {
continue //未注册重试,无需修改
}

Expand All @@ -148,7 +146,7 @@ func (manager *EventManager) Save(repo *freedom.Repository, entity freedom.Entit

// InsertSubEvent .
func (manager *EventManager) InsertSubEvent(event freedom.DomainEvent) error {
if !manager.eventRetry.SubExist(event.Topic()) {
if !manager.eventRetry.subExist(event.Topic()) {
return nil //未注册重试,无需存储
}

Expand All @@ -167,14 +165,19 @@ func (manager *EventManager) InsertSubEvent(event freedom.DomainEvent) error {
return manager.db().Create(&model).Error //插入消费事件表。
}

// RetryPubEvent .
func (manager *EventManager) RetryPubEvent(event freedom.DomainEvent) {
manager.eventRetry.RetryPubEvent(event)
// Retry .
func (manager *EventManager) Retry() {
manager.eventRetry.retry()
}

// BindRetryPubEvent .
func (manager *EventManager) BindRetryPubEvent(event freedom.DomainEvent) {
manager.eventRetry.bindRetryPubEvent(event)
}

// RetrySubEvent .
func (manager *EventManager) RetrySubEvent(event freedom.DomainEvent, function interface{}) {
manager.eventRetry.RetrySubEvent(event, function)
// BindRetrySubEvent .
func (manager *EventManager) BindRetrySubEvent(event freedom.DomainEvent, function interface{}) {
manager.eventRetry.bindRetrySubEvent(event, function)
}

// addPubToWorker 增加发布事件到Worker.Store.
Expand Down
Loading

0 comments on commit 0602c27

Please sign in to comment.