From 8c723095a20183711b4a403a8112264ebba8841e Mon Sep 17 00:00:00 2001 From: Cassandra Coyle Date: Sun, 24 Nov 2024 21:40:23 -0600 Subject: [PATCH] stop using runner manager bc it was killing all runners which was causing super confusing issues Signed-off-by: Cassandra Coyle --- cron/cron.go | 138 ++++++-- cron/cron_test.go | 502 ++++++++++++++++++++++++++++++ internal/engine/engine.go | 68 ++-- internal/engine/engine_test.go | 4 +- internal/garbage/collector.go | 1 + internal/leadership/leadership.go | 19 +- internal/queue/queue.go | 4 + tests/framework/fake/fake.go | 14 - 8 files changed, 672 insertions(+), 78 deletions(-) diff --git a/cron/cron.go b/cron/cron.go index 0987415..9b0c008 100644 --- a/cron/cron.go +++ b/cron/cron.go @@ -13,7 +13,6 @@ import ( "sync/atomic" "time" - "github.com/dapr/kit/concurrency" "github.com/go-logr/logr" "github.com/go-logr/zapr" "go.etcd.io/etcd/client/pkg/v3/logutil" @@ -70,6 +69,7 @@ type Options struct { // cron is the implementation of the cron interface. type cron struct { log logr.Logger + wg sync.WaitGroup key *key.Key leadership *leadership.Leadership @@ -78,11 +78,12 @@ type cron struct { triggerFn api.TriggerFunction gcgInterval *time.Duration - lock sync.RWMutex - engine atomic.Pointer[engine.Engine] - running atomic.Bool - readyCh chan struct{} - closeCh chan struct{} + lock sync.RWMutex + engine atomic.Pointer[engine.Engine] + running atomic.Bool + readyCh chan struct{} + closeCh chan struct{} + restartingCh chan struct{} } // New creates a new cron instance. @@ -131,15 +132,17 @@ func New(opts Options) (api.Interface, error) { }) return &cron{ - log: log, - key: key, - client: client, - part: part, - triggerFn: opts.TriggerFn, - gcgInterval: opts.CounterGarbageCollectionInterval, - leadership: leadership, - readyCh: make(chan struct{}), - closeCh: make(chan struct{}), + log: log, + wg: sync.WaitGroup{}, + key: key, + client: client, + part: part, + triggerFn: opts.TriggerFn, + gcgInterval: opts.CounterGarbageCollectionInterval, + leadership: leadership, + readyCh: make(chan struct{}), + closeCh: make(chan struct{}), + restartingCh: make(chan struct{}), }, nil } @@ -151,10 +154,45 @@ func (c *cron) Run(ctx context.Context) error { defer close(c.closeCh) - err := concurrency.NewRunnerManager( - c.leadership.Run, - func(ctx context.Context) error { - for { + errCh := make(chan error, 2) + defer close(errCh) + + c.wg.Add(2) + defer c.wg.Wait() + + go func(ctx context.Context) { + defer c.wg.Done() + if err := c.leadership.Run(ctx); err != nil { + if errors.Is(err, context.Canceled) { + // Ignore context cancellation errors + return + } + select { + case errCh <- err: + case <-ctx.Done(): + } + return + } + return + }(ctx) + + engineCtx, engineCancel := context.WithCancel(ctx) + defer engineCancel() + + go func(ctx context.Context) { + defer c.wg.Done() + for { + leadershipCtx, leadershipCancel := context.WithCancel(ctx) + + select { + case <-ctx.Done(): + leadershipCancel() + return + default: + c.lock.Lock() + c.restartingCh = make(chan struct{}) + c.lock.Unlock() + engine, err := engine.New(engine.Options{ Log: c.log, Key: c.key, @@ -164,42 +202,74 @@ func (c *cron) Run(ctx context.Context) error { CounterGarbageCollectionInterval: c.gcgInterval, }) if err != nil { - return fmt.Errorf("failed to create engine: %w", err) + leadershipCancel() + select { + case errCh <- err: + case <-ctx.Done(): + } + return } - ectx, err := c.leadership.WaitForLeadership(ctx) + ectx, err := c.leadership.WaitForLeadership(leadershipCtx) if err != nil { - return err + leadershipCancel() + select { + case errCh <- err: + case <-ctx.Done(): + } + return } + // Store the engine once ready c.lock.Lock() c.engine.Store(engine) close(c.readyCh) c.lock.Unlock() + // Run engine with leadership context if err := engine.Run(ectx); err != nil { - return err - } - - if err := ctx.Err(); err != nil { - return err + leadershipCancel() + select { + case errCh <- err: + case <-ctx.Done(): + } + return } - c.log.Info("restarting engine") + // Restart engine loop + c.log.Info("Restarting engine due to leadership change") c.lock.Lock() + close(c.restartingCh) c.readyCh = make(chan struct{}) c.lock.Unlock() + + // Check for cancellation again + if err := ctx.Err(); err != nil { + leadershipCancel() + select { + case errCh <- err: + case <-ctx.Done(): + } + return + } } - }, - ).Run(ctx) - if err != nil { - return err - } + } + }(engineCtx) + <-ctx.Done() // block until cron is done c.log.Info("cron shutdown gracefully") - return nil + select { + case err := <-errCh: + if errors.Is(err, context.Canceled) { + // Ignore context cancellation errors + return nil + } + return err + default: + return nil + } } // Add forwards the call to the embedded API. diff --git a/cron/cron_test.go b/cron/cron_test.go index c57eba6..62d4e58 100644 --- a/cron/cron_test.go +++ b/cron/cron_test.go @@ -7,16 +7,20 @@ package cron import ( "context" + "sync" "sync/atomic" "testing" "time" + "github.com/dapr/kit/ptr" "github.com/go-logr/logr" "github.com/stretchr/testify/require" + "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/anypb" "google.golang.org/protobuf/types/known/wrapperspb" "github.com/diagridio/go-etcd-cron/api" + "github.com/diagridio/go-etcd-cron/internal/api/stored" "github.com/diagridio/go-etcd-cron/tests/framework/etcd" ) @@ -77,4 +81,502 @@ func Test_Run(t *testing.T) { t.Fatal("timed out waiting Run response") } }) + + t.Run("cron engine remains ready after leadership change that keeps partition total the same", func(t *testing.T) { + t.Parallel() + + replicaData, err := anypb.New(wrapperspb.Bytes([]byte("data"))) + require.NoError(t, err) + client := etcd.EmbeddedBareClient(t) + + cronI, err := New(Options{ + Log: logr.Discard(), + Client: client, + Namespace: "abc", + PartitionID: 0, + PartitionTotal: 10, + TriggerFn: func(context.Context, *api.TriggerRequest) *api.TriggerResponse { + return &api.TriggerResponse{Result: api.TriggerResponseResult_SUCCESS} + }, + ReplicaData: replicaData, + }) + require.NoError(t, err) + cron := cronI.(*cron) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + errCh := make(chan error) + + go func() { + errCh <- cronI.Run(ctx) + }() + + // wait until ready + select { + case <-cron.readyCh: + case <-time.After(1 * time.Second): + t.Fatal("timed out waiting for cron to be ready") + } + + leadershipData, err := proto.Marshal(&stored.Leadership{ + Total: 10, + ReplicaData: replicaData, + }) + require.NoError(t, err, "failed to marshal leadership data") + + _, err = client.Put(context.Background(), "abc/leadership/1", string(leadershipData)) + require.NoError(t, err, "failed to insert leadership data into etcd") + + // wait until ready + select { + case <-cron.readyCh: + case <-time.After(1 * time.Second): + t.Fatal("timed out waiting for cron to be ready") + } + + // confirm cron is ready + err = cronI.Add(context.Background(), "a123", &api.Job{ + DueTime: ptr.Of("10s"), + }) + require.NoError(t, err) + + cancel() + select { + case err := <-errCh: + require.NoError(t, err) + case <-time.After(1 * time.Second): + t.Fatal("timed out waiting Run response") + } + }) + + t.Run("cron engine should show restarting when leadership changes, ensure proper close", func(t *testing.T) { + t.Parallel() + + replicaData, err := anypb.New(wrapperspb.Bytes([]byte("data"))) + require.NoError(t, err) + client := etcd.EmbeddedBareClient(t) + + cronI, err := New(Options{ + Log: logr.Discard(), + Client: client, + Namespace: "abc", + PartitionID: 0, + PartitionTotal: 10, + TriggerFn: func(context.Context, *api.TriggerRequest) *api.TriggerResponse { + return &api.TriggerResponse{Result: api.TriggerResponseResult_SUCCESS} + }, + ReplicaData: replicaData, + }) + require.NoError(t, err) + cron := cronI.(*cron) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + errCh := make(chan error) + + go func() { + errCh <- cronI.Run(ctx) + }() + + cron.lock.RLock() + readyCh := cron.readyCh + cron.lock.RUnlock() + + // wait until ready + select { + case <-readyCh: + case <-time.After(1 * time.Second): + t.Fatal("timed out waiting for cron to be ready") + } + + leadershipData, err := proto.Marshal(&stored.Leadership{ + Total: 5, + ReplicaData: replicaData, + }) + require.NoError(t, err, "failed to marshal leadership data") + + childCtx, childCancel := context.WithCancel(ctx) + defer cancel() + + var wg sync.WaitGroup + wg.Add(1) + defer wg.Wait() + + cron.lock.RLock() + restartingCh := cron.restartingCh + cron.lock.RUnlock() + + restartingDetected := make(chan struct{}) + // Monitor cron restarting + go func(ctx context.Context) { + defer childCancel() + defer wg.Done() + + select { + case <-restartingCh: + close(restartingDetected) + case <-ctx.Done(): + } + }(childCtx) + + _, err = client.Put(context.Background(), "abc/leadership/1", string(leadershipData)) + require.NoError(t, err, "failed to insert leadership data into etcd") + + select { + case <-restartingCh: + case <-time.After(1 * time.Second): + t.Fatal("timed out waiting for cron to be ready") + } + + cancel() + select { + case err := <-errCh: + require.NoError(t, err) + case <-time.After(1 * time.Second): + t.Fatal("timed out waiting Run response") + } + }) + + t.Run("cron should not be ready while restarting the engine", func(t *testing.T) { + t.Parallel() + + replicaData, err := anypb.New(wrapperspb.Bytes([]byte("data"))) + require.NoError(t, err) + client := etcd.EmbeddedBareClient(t) + + cronI, err := New(Options{ + Log: logr.Discard(), + Client: client, + Namespace: "abc", + PartitionID: 0, + PartitionTotal: 10, + TriggerFn: func(context.Context, *api.TriggerRequest) *api.TriggerResponse { + return &api.TriggerResponse{Result: api.TriggerResponseResult_SUCCESS} + }, + ReplicaData: replicaData, + }) + require.NoError(t, err) + cron := cronI.(*cron) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + errCh := make(chan error) + + go func() { + errCh <- cronI.Run(ctx) + }() + + cron.lock.RLock() + readyCh := cron.readyCh + cron.lock.RUnlock() + + // wait until ready + select { + case <-readyCh: + case <-time.After(1 * time.Second): + t.Fatal("timed out waiting for cron to be ready") + } + + leadershipData, err := proto.Marshal(&stored.Leadership{ + Total: 5, + ReplicaData: replicaData, + }) + require.NoError(t, err, "failed to marshal leadership data") + + childCtx, childCancel := context.WithCancel(ctx) + defer cancel() + + var wg sync.WaitGroup + wg.Add(1) + defer wg.Wait() + + cron.lock.RLock() + restartingCh := cron.restartingCh + cron.lock.RUnlock() + + restartingDetected := make(chan struct{}) + // Monitor cron restarting + go func(ctx context.Context) { + defer childCancel() + defer wg.Done() + + select { + case <-restartingCh: + close(restartingDetected) + case <-ctx.Done(): + } + }(childCtx) + + _, err = client.Put(context.Background(), "abc/leadership/1", string(leadershipData)) + require.NoError(t, err, "failed to insert leadership data into etcd") + + select { + case <-restartingCh: + case <-time.After(1 * time.Second): + t.Fatal("timed out waiting for cron to be ready") + } + + cron.lock.Lock() + newReadyCh := cron.readyCh + cron.lock.Unlock() + + select { + case <-newReadyCh: + t.Fatal("cron should not be ready while restarting") + case <-time.After(2 * time.Second): + } + + cancel() + select { + case err := <-errCh: + require.NoError(t, err) + case <-time.After(1 * time.Second): + t.Fatal("timed out waiting Run response") + } + }) + + t.Run("cron engine should gate until leadership ready", func(t *testing.T) { + t.Parallel() + + replicaData, err := anypb.New(wrapperspb.Bytes([]byte("data"))) + require.NoError(t, err) + client := etcd.EmbeddedBareClient(t) + + cronI, err := New(Options{ + Log: logr.Discard(), + Client: client, + Namespace: "abc", + PartitionID: 0, + PartitionTotal: 10, + TriggerFn: func(context.Context, *api.TriggerRequest) *api.TriggerResponse { + return &api.TriggerResponse{Result: api.TriggerResponseResult_SUCCESS} + }, + ReplicaData: replicaData, + }) + require.NoError(t, err) + cron := cronI.(*cron) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + errCh := make(chan error) + + go func() { + errCh <- cronI.Run(ctx) + }() + + cron.lock.RLock() + readyCh := cron.readyCh + cron.lock.RUnlock() + + // wait until ready + select { + case <-readyCh: + case <-time.After(1 * time.Second): + t.Fatal("timed out waiting for cron to be ready") + } + + // intentionally incorrect total + leadershipData, err := proto.Marshal(&stored.Leadership{ + Total: 5, + ReplicaData: replicaData, + }) + require.NoError(t, err, "failed to marshal leadership data") + + childCtx, childCancel := context.WithCancel(ctx) + defer cancel() + + var wg sync.WaitGroup + wg.Add(1) + defer wg.Wait() + + cron.lock.RLock() + restartingCh := cron.restartingCh + cron.lock.RUnlock() + + restartingDetected := make(chan struct{}) + // Monitor cron restarting + go func(ctx context.Context) { + defer childCancel() + defer wg.Done() + + select { + case <-restartingCh: + close(restartingDetected) + case <-ctx.Done(): + } + }(childCtx) + + _, err = client.Put(context.Background(), "abc/leadership/1", string(leadershipData)) + require.NoError(t, err, "failed to insert leadership data into etcd") + + // cron engine restarting + select { + case <-restartingCh: + case <-time.After(1 * time.Second): + t.Fatal("timed out waiting for cron to be ready") + } + + cron.lock.Lock() + newReadyCh := cron.readyCh + cron.lock.Unlock() + + select { + case <-newReadyCh: + t.Fatal("cron should not be ready while restarting") + case <-time.After(2 * time.Second): + } + + // Ensure restartingCh is reset eventually + require.Eventually(t, func() bool { + cron.lock.RLock() + defer cron.lock.RUnlock() + return cron.restartingCh != nil + }, 1*time.Second, 100*time.Millisecond, "cron restartingCh was not reset") + + // double check cron does not become ready + cron.lock.RLock() + finalReadyCh := cron.readyCh + cron.lock.RUnlock() + + select { + case <-finalReadyCh: + t.Fatal("cron became ready unexpectedly after invalid leadership data change. leadership should not be ready and should gate cron being ready") + case <-time.After(2 * time.Second): + } + + cancel() + select { + case err := <-errCh: + require.NoError(t, err) + case <-time.After(1 * time.Second): + t.Fatal("timed out waiting Run response") + } + }) + + t.Run("cron engine should become ready again after proper leadership change", func(t *testing.T) { + t.Parallel() + + replicaData, err := anypb.New(wrapperspb.Bytes([]byte("data"))) + require.NoError(t, err) + client := etcd.EmbeddedBareClient(t) + + cronI, err := New(Options{ + Log: logr.Discard(), + Client: client, + Namespace: "abc", + PartitionID: 0, + PartitionTotal: 10, + TriggerFn: func(context.Context, *api.TriggerRequest) *api.TriggerResponse { + return &api.TriggerResponse{Result: api.TriggerResponseResult_SUCCESS} + }, + ReplicaData: replicaData, + }) + require.NoError(t, err) + cron := cronI.(*cron) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + errCh := make(chan error) + + go func() { + errCh <- cronI.Run(ctx) + }() + + cron.lock.RLock() + readyCh := cron.readyCh + cron.lock.RUnlock() + + // wait until ready + select { + case <-readyCh: + case <-time.After(1 * time.Second): + t.Fatal("timed out waiting for cron to be ready") + } + + // intentionally incorrect total + leadershipData, err := proto.Marshal(&stored.Leadership{ + Total: 5, + ReplicaData: replicaData, + }) + require.NoError(t, err, "failed to marshal leadership data") + + childCtx, childCancel := context.WithCancel(ctx) + defer cancel() + + var wg sync.WaitGroup + wg.Add(1) + defer wg.Wait() + + cron.lock.RLock() + restartingCh := cron.restartingCh + cron.lock.RUnlock() + + restartingDetected := make(chan struct{}) + // Monitor cron restarting + go func(ctx context.Context) { + defer childCancel() + defer wg.Done() + + select { + case <-restartingCh: + close(restartingDetected) + case <-ctx.Done(): + } + }(childCtx) + + _, err = client.Put(context.Background(), "abc/leadership/1", string(leadershipData)) + require.NoError(t, err, "failed to insert leadership data into etcd") + + select { + case <-restartingCh: + case <-time.After(1 * time.Second): + t.Fatal("timed out waiting for cron to be ready") + } + + cron.lock.Lock() + newReadyCh := cron.readyCh + cron.lock.Unlock() + + select { + case <-newReadyCh: + t.Fatal("cron should not be ready while restarting") + case <-time.After(2 * time.Second): + } + + // Ensure restartingCh is reset eventually + require.Eventually(t, func() bool { + cron.lock.RLock() + defer cron.lock.RUnlock() + return cron.restartingCh != nil + }, 1*time.Second, 100*time.Millisecond, "cron restartingCh was not reset") + + // Update leadership data to match partition total + correctLeadershipData, err := proto.Marshal(&stored.Leadership{ + Total: 10, + ReplicaData: replicaData, + }) + require.NoError(t, err) + + _, err = client.Put(context.Background(), "abc/leadership/1", string(correctLeadershipData)) + require.NoError(t, err, "failed to update leadership data in etcd") + + // Wait for the cron to become ready again + cron.lock.RLock() + finalReadyCh := cron.readyCh + cron.lock.RUnlock() + + select { + case <-finalReadyCh: + case <-time.After(2 * time.Second): + t.Fatal("cron did not become ready again after restart") + } + + cancel() + select { + case err := <-errCh: + require.NoError(t, err) + case <-time.After(1 * time.Second): + t.Fatal("timed out waiting Run response") + } + }) } diff --git a/internal/engine/engine.go b/internal/engine/engine.go index a0d636b..ca11b0a 100644 --- a/internal/engine/engine.go +++ b/internal/engine/engine.go @@ -9,10 +9,10 @@ import ( "context" "errors" "fmt" + "sync" "sync/atomic" "time" - "github.com/dapr/kit/concurrency" "github.com/go-logr/logr" "k8s.io/utils/clock" @@ -64,6 +64,7 @@ type Engine struct { informer *informer.Informer api internalapi.Interface running atomic.Bool + wg sync.WaitGroup } func New(opts Options) (*Engine, error) { @@ -112,6 +113,7 @@ func New(opts Options) (*Engine, error) { queue: queue, informer: informer, api: api, + wg: sync.WaitGroup{}, }, nil } @@ -124,29 +126,55 @@ func (e *Engine) Run(ctx context.Context) error { e.log.Info("starting cron engine") defer e.log.Info("cron engine shut down") - return concurrency.NewRunnerManager( - e.collector.Run, - e.queue.Run, - e.informer.Run, - e.api.Run, - func(ctx context.Context) error { - ev, err := e.informer.Events() - if err != nil { - return err - } + errCh := make(chan error, 5) + defer close(errCh) + + e.wg.Add(4) + + go func() { defer e.wg.Done(); errCh <- e.collector.Run(ctx) }() + go func() { defer e.wg.Done(); errCh <- e.queue.Run(ctx) }() + go func() { defer e.wg.Done(); errCh <- e.informer.Run(ctx) }() + go func() { defer e.wg.Done(); errCh <- e.api.Run(ctx) }() - for { - select { - case <-ctx.Done(): - return ctx.Err() - case event := <-ev: - if err := e.queue.HandleInformerEvent(ctx, event); err != nil { - return err + e.wg.Add(1) + go func(ctx context.Context) { + defer e.wg.Done() + + ev, err := e.informer.Events() + if err != nil { + select { + case errCh <- err: + case <-ctx.Done(): + } + return + } + + for { + select { + case <-ctx.Done(): + return + case event := <-ev: + if err := e.queue.HandleInformerEvent(ctx, event); err != nil { + select { + case errCh <- err: + case <-ctx.Done(): } + return } } - }, - ).Run(ctx) + } + }(ctx) + + <-ctx.Done() + + e.wg.Wait() + + select { + case err := <-errCh: + return err + default: + return nil + } } func (e *Engine) API() internalapi.Interface { diff --git a/internal/engine/engine_test.go b/internal/engine/engine_test.go index 6f41c21..a159744 100644 --- a/internal/engine/engine_test.go +++ b/internal/engine/engine_test.go @@ -90,10 +90,8 @@ func Test_Run(t *testing.T) { time.Sleep(100 * time.Millisecond) cancel() - err = <-errCh - // runner manager returns nil here - assert.NoError(t, err) + assert.NoError(t, <-errCh) }) t.Run("multiple starts", func(t *testing.T) { diff --git a/internal/garbage/collector.go b/internal/garbage/collector.go index 26aab3e..2328d9a 100644 --- a/internal/garbage/collector.go +++ b/internal/garbage/collector.go @@ -90,6 +90,7 @@ func (c *collector) Run(ctx context.Context) error { if !c.running.CompareAndSwap(false, true) { return errors.New("garbage collector is already running") } + c.log.Info("starting garbage collector") defer c.closed.Store(true) for { diff --git a/internal/leadership/leadership.go b/internal/leadership/leadership.go index 327b4f1..56d60c2 100644 --- a/internal/leadership/leadership.go +++ b/internal/leadership/leadership.go @@ -92,9 +92,12 @@ func (l *Leadership) Run(ctx context.Context) error { case <-ctx.Done(): return ctx.Err() default: - if err := l.loop(ctx); err != nil { + loopCtx, loopCancel := context.WithCancel(ctx) + if err := l.loop(loopCtx); err != nil { + loopCancel() return err } + loopCancel() } } } @@ -188,7 +191,7 @@ func (l *Leadership) loop(ctx context.Context) error { } } - l.log.Info("All partition leadership keys match partition total, processing is ready") + l.log.Info("All partition leadership keys match partition total, leadership is ready") l.lock.Lock() close(l.readyCh) l.lock.Unlock() @@ -197,6 +200,7 @@ func (l *Leadership) loop(ctx context.Context) error { for { select { case <-ctx.Done(): + break case <-ch: } @@ -213,7 +217,6 @@ func (l *Leadership) loop(ctx context.Context) error { break } } - l.lock.Lock() defer l.lock.Unlock() @@ -288,6 +291,8 @@ func (l *Leadership) checkLeadershipKeys(ctx context.Context) (bool, error) { // it does not exist. // If it does exist, and we successfully wrote the leadership key, it will return true. func (l *Leadership) attemptPartitionLeadership(ctx context.Context, leaseID clientv3.LeaseID) (bool, error) { + leaderCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() leaderBytes, err := proto.Marshal(&stored.Leadership{ Total: l.partitionTotal, ReplicaData: l.replicaData, @@ -296,7 +301,7 @@ func (l *Leadership) attemptPartitionLeadership(ctx context.Context, leaseID cli return false, fmt.Errorf("failed to marshal leadership data: %w", err) } - tx := l.client.Txn(ctx). + tx := l.client.Txn(leaderCtx). If(clientv3.Compare(clientv3.CreateRevision(l.key.LeadershipKey()), "=", 0)). Then(clientv3.OpPut(l.key.LeadershipKey(), string(leaderBytes), clientv3.WithLease(leaseID))) resp, err := tx.Commit() @@ -333,17 +338,17 @@ func (l *Leadership) WaitForLeadership(ctx context.Context) (context.Context, er leaderCtx, cancel := context.WithCancel(ctx) l.wg.Add(1) - go func() { + go func(ctx context.Context) { defer l.wg.Done() defer cancel() select { - case <-ctx.Done(): + case <-leaderCtx.Done(): case <-closeCh: case <-changeCh: // Leadership change detected; cancel context to signal leadership shift } - }() + }(ctx) return leaderCtx, nil } diff --git a/internal/queue/queue.go b/internal/queue/queue.go index 82e604f..7a8bdac 100644 --- a/internal/queue/queue.go +++ b/internal/queue/queue.go @@ -132,6 +132,10 @@ func (q *Queue) Run(ctx context.Context) error { q.queue = queue.NewProcessor[string, counter.Interface](q.executeFn(ctx)).WithClock(q.clock) close(q.readyCh) + + q.log.Info("queue is ready") + defer q.log.Info("shutting down queue") + <-ctx.Done() q.queue.Close() return nil diff --git a/tests/framework/fake/fake.go b/tests/framework/fake/fake.go index 921bf75..8261167 100644 --- a/tests/framework/fake/fake.go +++ b/tests/framework/fake/fake.go @@ -9,7 +9,6 @@ import ( "context" "github.com/diagridio/go-etcd-cron/api" - "google.golang.org/protobuf/types/known/anypb" ) // Fake is a fake cron instance used for testing. @@ -24,19 +23,6 @@ type Fake struct { deliverablePrefixesFn func(ctx context.Context, prefixes ...string) (context.CancelFunc, error) } -func (f *Fake) Close() { -} - -func (f *Fake) SetReady() { -} - -func (f *Fake) SetUnready() { -} - -func (f *Fake) WatchLeadership(ctx context.Context) (chan []*anypb.Any, error) { - panic("implement me") -} - func New() *Fake { return &Fake{ runFn: func(ctx context.Context) error {