Skip to content

Commit

Permalink
connect background process context to stop channel (#13657)
Browse files Browse the repository at this point in the history
  • Loading branch information
EasterTheBunny authored Jun 21, 2024
1 parent c758cda commit d298bb2
Showing 1 changed file with 10 additions and 7 deletions.
17 changes: 10 additions & 7 deletions core/bridges/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ type Cache struct {

// service state
services.StateMachine
wg sync.WaitGroup
stop chan struct{}
wg sync.WaitGroup
chStop services.StopChan

// data state
bridgeTypesCache sync.Map
Expand All @@ -47,7 +47,7 @@ func NewCache(base ORM, lggr logger.Logger, upsertInterval time.Duration) *Cache
ORM: base,
lggr: lggr.Named(CacheServiceName),
interval: upsertInterval,
stop: make(chan struct{}, 1),
chStop: make(chan struct{}),
bridgeLastValueCache: make(map[string]BridgeResponse),
}
}
Expand Down Expand Up @@ -190,7 +190,7 @@ func (c *Cache) UpsertBridgeResponse(ctx context.Context, dotId string, specId i
return nil
}

func (c *Cache) Start(context.Context) error {
func (c *Cache) Start(_ context.Context) error {
return c.StartOnce(CacheServiceName, func() error {
c.wg.Add(1)

Expand All @@ -202,7 +202,7 @@ func (c *Cache) Start(context.Context) error {

func (c *Cache) Close() error {
return c.StopOnce(CacheServiceName, func() error {
close(c.stop)
close(c.chStop)
c.wg.Wait()

return nil
Expand All @@ -226,7 +226,7 @@ func (c *Cache) run() {
select {
case <-timer.C:
c.doBulkUpsert()
case <-c.stop:
case <-c.chStop:
timer.Stop()

return
Expand All @@ -243,7 +243,10 @@ func (c *Cache) doBulkUpsert() {
return
}

if err := c.ORM.BulkUpsertBridgeResponse(context.Background(), values); err != nil {
ctx, cancel := c.chStop.NewCtx()
defer cancel()

if err := c.ORM.BulkUpsertBridgeResponse(ctx, values); err != nil {
c.lggr.Warnf("bulk upsert of bridge responses failed: %s", err.Error())
}
}
Expand Down

0 comments on commit d298bb2

Please sign in to comment.