diff --git a/cron.go b/cron.go index 2f6ccb4..63b0dfe 100644 --- a/cron.go +++ b/cron.go @@ -30,11 +30,11 @@ const ( // be inspected while running. type Cron struct { namespace string + pendingOperations []func(context.Context) *Entry + operationsMutex sync.RWMutex entries map[string]*Entry - entriesMutex sync.RWMutex stop chan struct{} - add chan *Entry - delete chan string + cancel context.CancelFunc snapshot chan []*Entry etcdErrorsHandler func(context.Context, Job, error) errorsHandler func(context.Context, Job, error) @@ -161,12 +161,11 @@ func WithPartitioning(p Partitioning) CronOpt { // New returns a new Cron job runner. func New(opts ...CronOpt) (*Cron, error) { cron := &Cron{ - entries: map[string]*Entry{}, - add: make(chan *Entry), - delete: make(chan string), - stop: make(chan struct{}), - snapshot: make(chan []*Entry), - running: false, + pendingOperations: []func(context.Context) *Entry{}, + entries: map[string]*Entry{}, + stop: make(chan struct{}), + snapshot: make(chan []*Entry), + running: false, } for _, opt := range opts { opt(cron) @@ -235,23 +234,21 @@ func (c *Cron) DeleteJob(jobName string) error { } func (c *Cron) killJob(name string) { - if !c.running { + c.appendOperation(func(ctx context.Context) *Entry { _, ok := c.entries[name] if !ok { - return + return nil } delete(c.entries, name) - return - } - - c.delete <- name + return nil + }) } // GetJob retrieves a job by name. func (c *Cron) GetJob(jobName string) *Job { - c.entriesMutex.RLock() - defer c.entriesMutex.RUnlock() + c.operationsMutex.RLock() + defer c.operationsMutex.RUnlock() entry, ok := c.entries[jobName] if !ok || (entry == nil) { @@ -263,18 +260,19 @@ func (c *Cron) GetJob(jobName string) *Job { func (c *Cron) ListJobsByPrefix(prefix string) []*Job { var appJobs []*Job - for _, entry := range c.entrySnapshot() { + c.operationsMutex.RLock() + for _, entry := range c.entries { if strings.HasPrefix(entry.Job.Name, prefix) { // Job belongs to the specified app_id appJobs = append(appJobs, &entry.Job) } } + c.operationsMutex.RUnlock() return appJobs } // Schedule adds a Job to the Cron to be run on the given schedule. func (c *Cron) scheduleJob(job Job) error { - fmt.Printf("Scheduling job: %s %s\n", job.Name, job.Rhythm) s, err := Parse(job.Rhythm) if err != nil { return err @@ -297,35 +295,49 @@ func (c *Cron) schedule(schedule Schedule, job Job) error { Prev: time.Unix(0, 0), distMutexPrefix: c.organizer.TicksPath(partitionId) + "/", } - if !c.running { - c.entriesMutex.Lock() - defer c.entriesMutex.Unlock() + c.appendOperation(func(ctx context.Context) *Entry { c.entries[entry.Job.Name] = entry - return nil - } - - c.add <- entry + return entry + }) return nil } +func (c *Cron) appendOperation(op func(ctx context.Context) *Entry) { + c.operationsMutex.Lock() + defer c.operationsMutex.Unlock() + + c.pendingOperations = append(c.pendingOperations, op) +} + // Entries returns a snapshot of the cron entries. func (c *Cron) Entries() []*Entry { if c.running { - c.entriesMutex.RLock() - defer c.entriesMutex.RUnlock() - c.snapshot <- nil x := <-c.snapshot return x } - return c.entrySnapshot() + + c.operationsMutex.RLock() + defer c.operationsMutex.RUnlock() + entries := []*Entry{} + for _, e := range c.entries { + entries = append(entries, &Entry{ + Schedule: e.Schedule, + Next: e.Next, + Prev: e.Prev, + Job: e.Job, + }) + } + return entries } // Start the cron scheduler in its own go-routine. func (c *Cron) Start(ctx context.Context) { c.running = true - go c.run(ctx) + ctxWithCancel, cancel := context.WithCancel(ctx) + c.cancel = cancel + go c.run(ctxWithCancel) } // Run the scheduler.. this is private just due to the need to synchronize @@ -336,19 +348,28 @@ func (c *Cron) run(ctx context.Context) { // Figure out the next activation times for each entry. now := time.Now().Local() - changed := true + changed := make(chan bool) entries := []*Entry{} - c.entriesMutex.RLock() - for _, e := range c.entries { - e.Next = e.Schedule.Next(now) - } - c.entriesMutex.RUnlock() for { - if changed { - entries = c.entrySnapshot() - changed = false - } + go func(ctx context.Context) { + for { + hasPendingOperations := false + c.operationsMutex.RLock() + hasPendingOperations = len(c.pendingOperations) > 0 + c.operationsMutex.RUnlock() + + if hasPendingOperations { + changed <- true + } + + select { + case <-ctx.Done(): + return + case <-time.After(time.Second): + } + } + }(ctx) sort.Sort(byTime(entries)) var effective time.Time @@ -361,6 +382,26 @@ func (c *Cron) run(ctx context.Context) { } select { + case <-changed: + c.operationsMutex.Lock() + for _, op := range c.pendingOperations { + newEntry := op(ctx) + if newEntry != nil { + newEntry.Next = newEntry.Schedule.Next(now) + } + } + c.pendingOperations = []func(context.Context) *Entry{} + entries = []*Entry{} + for _, e := range c.entries { + entries = append(entries, &Entry{ + Schedule: e.Schedule, + Next: e.Next, + Prev: e.Prev, + Job: e.Job, + }) + } + c.operationsMutex.Unlock() + case now = <-time.After(effective.Sub(now)): // Run every entry whose next time was this effective time. for _, e := range entries { @@ -416,24 +457,8 @@ func (c *Cron) run(ctx context.Context) { } continue - case newEntry := <-c.add: - now = time.Now().Local() - newEntry.Next = newEntry.Schedule.Next(now) - c.entriesMutex.Lock() - c.entries[newEntry.Job.Name] = newEntry - changed = true - c.entriesMutex.Unlock() - - case name := <-c.delete: - c.entriesMutex.Lock() - delete(c.entries, name) - changed = true - c.entriesMutex.Unlock() - now = time.Now().Local() - case <-c.snapshot: - now = time.Now().Local() - c.snapshot <- c.entrySnapshot() + c.snapshot <- c.entrySnapshot(entries) case <-c.stop: c.runWaitingGroup.Done() @@ -445,17 +470,15 @@ func (c *Cron) run(ctx context.Context) { // Stop the cron scheduler. func (c *Cron) Stop() { c.stop <- struct{}{} + c.cancel() c.running = false c.runWaitingGroup.Wait() } // entrySnapshot returns a copy of the current cron entry list. -func (c *Cron) entrySnapshot() []*Entry { - c.entriesMutex.RLock() - defer c.entriesMutex.RUnlock() - +func (c *Cron) entrySnapshot(input []*Entry) []*Entry { entries := []*Entry{} - for _, e := range c.entries { + for _, e := range input { entries = append(entries, &Entry{ Schedule: e.Schedule, Next: e.Next, @@ -463,6 +486,5 @@ func (c *Cron) entrySnapshot() []*Entry { Job: e.Job, }) } - sort.Sort(byTime(entries)) return entries } diff --git a/cron_test.go b/cron_test.go index 597dcb7..995c48a 100644 --- a/cron_test.go +++ b/cron_test.go @@ -98,7 +98,7 @@ func TestAddBeforeRunning(t *testing.T) { // Start cron, add a job, expect it runs. func TestAddWhileRunning(t *testing.T) { wg := &sync.WaitGroup{} - wg.Add(1) + wg.Add(2) cron, err := New( WithNamespace(randomNamespace()), @@ -118,7 +118,7 @@ func TestAddWhileRunning(t *testing.T) { }) select { - case <-time.After(ONE_SECOND): + case <-time.After(2 * ONE_SECOND): t.FailNow() case <-wait(wg): } @@ -127,7 +127,7 @@ func TestAddWhileRunning(t *testing.T) { // Test timing with Entries. func TestSnapshotEntries(t *testing.T) { wg := &sync.WaitGroup{} - wg.Add(1) + wg.Add(2) cron, err := New( WithNamespace(randomNamespace()), @@ -145,15 +145,63 @@ func TestSnapshotEntries(t *testing.T) { cron.Start(context.Background()) defer cron.Stop() - // Cron should fire in 2 seconds. After 1 second, call Entries. + // After 1 second, call Entries. select { case <-time.After(ONE_SECOND): cron.Entries() } - // Even though Entries was called, the cron should fire at the 2 second mark. + // Even though Entries was called, the cron should fire twice within 3 seconds (1 + 3). select { - case <-time.After(ONE_SECOND): + case <-time.After(3 * ONE_SECOND): + t.FailNow() + case <-wait(wg): + } +} + +// Test delayed add after un starts for a while. +func TestDelayedAdd(t *testing.T) { + wg := &sync.WaitGroup{} + wg.Add(1) + called := false + + cron, err := New( + WithNamespace(randomNamespace()), + WithTriggerFunc(func(ctx context.Context, s string, p *anypb.Any) error { + if s == "noop" { + return nil + } + if called { + t.Fatal("cannot call twice") + } + called = true + wg.Done() + return nil + })) + if err != nil { + t.Fatal("unexpected error") + } + + cron.AddJob(Job{ + Name: "test-noop", + Rhythm: "@every 1s", + Type: "noop", + }) + + cron.Start(context.Background()) + defer cron.Stop() + + // Artificial delay before add another record. + time.Sleep(10 * time.Second) + + cron.AddJob(Job{ + Name: "test-ev-2s", + Rhythm: "@every 2s", + }) + + // Event should be called only once within 2 seconds. + select { + case <-time.After(3 * ONE_SECOND): t.FailNow() case <-wait(wg): } diff --git a/examples/cron_example.go b/examples/cron_example.go index fcf5436..9ae6e82 100644 --- a/examples/cron_example.go +++ b/examples/cron_example.go @@ -26,11 +26,15 @@ func main() { } numHosts, err := strconv.Atoi(os.Getenv("NUM_HOSTS")) if err != nil { - numHosts = 2 + numHosts = 1 } numPartitions, err := strconv.Atoi(os.Getenv("NUM_PARTITIONS")) if err != nil { - numPartitions = 5 + numPartitions = 1 + } + namespace := os.Getenv("NAMESPACE") + if namespace == "" { + namespace = "example" } log.Printf("starting hostId=%d for total of %d hosts and %d partitions", hostId, numHosts, numPartitions) @@ -40,10 +44,10 @@ func main() { log.Fatal("fail to create partitioning", err) } cron, err := etcdcron.New( - etcdcron.WithNamespace("example"), + etcdcron.WithNamespace(namespace), etcdcron.WithPartitioning(p), etcdcron.WithTriggerFunc(func(ctx context.Context, triggerType string, payload *anypb.Any) error { - fmt.Printf("Trigger from pid %d: %s %s\n", os.Getpid(), triggerType, string(payload.Value)) + log.Printf("Trigger from pid %d: %s %s\n", os.Getpid(), triggerType, string(payload.Value)) return nil }), ) @@ -67,46 +71,53 @@ func main() { if os.Getenv("ADD") == "1" { cron.AddJob(etcdcron.Job{ - Name: "error-every-2s", + Name: "every-2s-b34w5y5hbwthjs", Rhythm: "*/2 * * * * *", Type: "stdout", // can be anything the client wants - Payload: &anypb.Any{Value: []byte("even error")}, + Payload: &anypb.Any{Value: []byte("ev 2s")}, }) cron.AddJob(etcdcron.Job{ - Name: "echo-every-10s", + Name: "every-10s-bnsf45354wbdsnd", Rhythm: "*/10 * * * * *", Type: "stdout", // can be anything the client wants - Payload: &anypb.Any{Value: []byte("every 10 seconds")}, + Payload: &anypb.Any{Value: []byte("ev 10s")}, }) cron.AddJob(etcdcron.Job{ - Name: "error-every-3s", + Name: "every-3s-mdhgm764324rqdg", Rhythm: "*/3 * * * * *", Type: "stdout", // can be anything the client wants - Payload: &anypb.Any{Value: []byte("odd error")}, + Payload: &anypb.Any{Value: []byte("ev 3s")}, }) cron.AddJob(etcdcron.Job{ - Name: "error-every-4s", + Name: "every-4s-vdafbrtjnysh245", Rhythm: "*/4 * * * * *", Type: "stdout", // can be anything the client wants - Payload: &anypb.Any{Value: []byte("fourth error")}, + Payload: &anypb.Any{Value: []byte("ev 4s")}, }) cron.AddJob(etcdcron.Job{ - Name: "error-every-5s", + Name: "every-5s-adjbg43q5rbafbr44", Rhythm: "*/5 * * * * *", Type: "stdout", // can be anything the client wants - Payload: &anypb.Any{Value: []byte("fifth error")}, + Payload: &anypb.Any{Value: []byte("ev 5s")}, }) cron.AddJob(etcdcron.Job{ - Name: "error-every-6s", + Name: "every-6s-abadfh52jgdyj467", Rhythm: "*/6 * * * * *", Type: "stdout", // can be anything the client wants - Payload: &anypb.Any{Value: []byte("sixth error")}, + Payload: &anypb.Any{Value: []byte("ev 6s")}, }) cron.AddJob(etcdcron.Job{ - Name: "error-every-7s", + Name: "every-7s-bndasfbn4q55fgn", Rhythm: "*/7 * * * * *", Type: "stdout", // can be anything the client wants - Payload: &anypb.Any{Value: []byte("seventh error")}, + Payload: &anypb.Any{Value: []byte("ev 7s")}, + }) + cron.AddJob(etcdcron.Job{ + Name: "every-1s-then-expire-hadfh452erhh", + Rhythm: "*/1 * * * * *", + Type: "stdout", // can be anything the client wants + TTL: 10, + Payload: &anypb.Any{Value: []byte("ev 1s then expires after 10s")}, }) } cron.Start(context.Background()) diff --git a/store.go b/store.go index a0eb607..63f8f2a 100644 --- a/store.go +++ b/store.go @@ -8,6 +8,7 @@ package etcdcron import ( "context" "fmt" + "log" "path/filepath" "github.com/pkg/errors" @@ -168,21 +169,27 @@ func (s *etcdStore) notifyDelete(ctx context.Context, name string, callback func func (s *etcdStore) sync(ctx context.Context, prefix string, syncer mirror.Syncer) { go func() { - fmt.Printf("Started sync for partition: %s\n", prefix) + log.Printf("Started sync for path: %s\n", prefix) wc := syncer.SyncUpdates(ctx) - for wr := range wc { - for _, ev := range wr.Events { - switch ev.Type { - case mvccpb.PUT: - s.notifyPut(ctx, ev.Kv, s.putCallback) - case mvccpb.DELETE: - _, name := filepath.Split(string(ev.Kv.Key)) - s.notifyDelete(ctx, name, s.deleteCallback) - default: - panic("unexpected etcd event type") + done := false + for !done { + select { + case <-ctx.Done(): + done = true + case wr := <-wc: + for _, ev := range wr.Events { + switch ev.Type { + case mvccpb.PUT: + s.notifyPut(ctx, ev.Kv, s.putCallback) + case mvccpb.DELETE: + _, name := filepath.Split(string(ev.Kv.Key)) + s.notifyDelete(ctx, name, s.deleteCallback) + default: + panic("unexpected etcd event type") + } } } } - fmt.Printf("Exited sync for partition: %s\n", prefix) + log.Printf("Exited sync for path: %s\n", prefix) }() }