diff --git a/core/bridges/cache.go b/core/bridges/cache.go index b65a1667519..4b5a6552447 100644 --- a/core/bridges/cache.go +++ b/core/bridges/cache.go @@ -30,8 +30,8 @@ type Cache struct { // service state services.StateMachine - wg sync.WaitGroup - stop chan struct{} + wg sync.WaitGroup + chStop services.StopChan // data state bridgeTypesCache sync.Map @@ -47,7 +47,7 @@ func NewCache(base ORM, lggr logger.Logger, upsertInterval time.Duration) *Cache ORM: base, lggr: lggr.Named(CacheServiceName), interval: upsertInterval, - stop: make(chan struct{}, 1), + chStop: make(chan struct{}), bridgeLastValueCache: make(map[string]BridgeResponse), } } @@ -190,7 +190,7 @@ func (c *Cache) UpsertBridgeResponse(ctx context.Context, dotId string, specId i return nil } -func (c *Cache) Start(context.Context) error { +func (c *Cache) Start(_ context.Context) error { return c.StartOnce(CacheServiceName, func() error { c.wg.Add(1) @@ -202,7 +202,7 @@ func (c *Cache) Start(context.Context) error { func (c *Cache) Close() error { return c.StopOnce(CacheServiceName, func() error { - close(c.stop) + close(c.chStop) c.wg.Wait() return nil @@ -226,7 +226,7 @@ func (c *Cache) run() { select { case <-timer.C: c.doBulkUpsert() - case <-c.stop: + case <-c.chStop: timer.Stop() return @@ -243,7 +243,10 @@ func (c *Cache) doBulkUpsert() { return } - if err := c.ORM.BulkUpsertBridgeResponse(context.Background(), values); err != nil { + ctx, cancel := c.chStop.NewCtx() + defer cancel() + + if err := c.ORM.BulkUpsertBridgeResponse(ctx, values); err != nil { c.lggr.Warnf("bulk upsert of bridge responses failed: %s", err.Error()) } }