Skip to content

Commit

Permalink
Some random fixes.
Browse files Browse the repository at this point in the history
Signed-off-by: Artur Souza <[email protected]>
  • Loading branch information
artursouza committed Mar 11, 2024
1 parent 2ea4fd4 commit 3e3bda4
Show file tree
Hide file tree
Showing 4 changed files with 188 additions and 100 deletions.
150 changes: 86 additions & 64 deletions cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ const (
// be inspected while running.
type Cron struct {
namespace string
pendingOperations []func(context.Context) *Entry
operationsMutex sync.RWMutex
entries map[string]*Entry
entriesMutex sync.RWMutex
stop chan struct{}
add chan *Entry
delete chan string
cancel context.CancelFunc
snapshot chan []*Entry
etcdErrorsHandler func(context.Context, Job, error)
errorsHandler func(context.Context, Job, error)
Expand Down Expand Up @@ -161,12 +161,11 @@ func WithPartitioning(p Partitioning) CronOpt {
// New returns a new Cron job runner.
func New(opts ...CronOpt) (*Cron, error) {
cron := &Cron{
entries: map[string]*Entry{},
add: make(chan *Entry),
delete: make(chan string),
stop: make(chan struct{}),
snapshot: make(chan []*Entry),
running: false,
pendingOperations: []func(context.Context) *Entry{},
entries: map[string]*Entry{},
stop: make(chan struct{}),
snapshot: make(chan []*Entry),
running: false,
}
for _, opt := range opts {
opt(cron)
Expand Down Expand Up @@ -235,23 +234,21 @@ func (c *Cron) DeleteJob(jobName string) error {
}

func (c *Cron) killJob(name string) {
if !c.running {
c.appendOperation(func(ctx context.Context) *Entry {
_, ok := c.entries[name]
if !ok {
return
return nil
}

delete(c.entries, name)
return
}

c.delete <- name
return nil
})
}

// GetJob retrieves a job by name.
func (c *Cron) GetJob(jobName string) *Job {
c.entriesMutex.RLock()
defer c.entriesMutex.RUnlock()
c.operationsMutex.RLock()
defer c.operationsMutex.RUnlock()

entry, ok := c.entries[jobName]
if !ok || (entry == nil) {
Expand All @@ -263,18 +260,19 @@ func (c *Cron) GetJob(jobName string) *Job {

func (c *Cron) ListJobsByPrefix(prefix string) []*Job {
var appJobs []*Job
for _, entry := range c.entrySnapshot() {
c.operationsMutex.RLock()
for _, entry := range c.entries {
if strings.HasPrefix(entry.Job.Name, prefix) {
// Job belongs to the specified app_id
appJobs = append(appJobs, &entry.Job)
}
}
c.operationsMutex.RUnlock()
return appJobs
}

// Schedule adds a Job to the Cron to be run on the given schedule.
func (c *Cron) scheduleJob(job Job) error {
fmt.Printf("Scheduling job: %s %s\n", job.Name, job.Rhythm)
s, err := Parse(job.Rhythm)
if err != nil {
return err
Expand All @@ -297,35 +295,49 @@ func (c *Cron) schedule(schedule Schedule, job Job) error {
Prev: time.Unix(0, 0),
distMutexPrefix: c.organizer.TicksPath(partitionId) + "/",
}
if !c.running {
c.entriesMutex.Lock()
defer c.entriesMutex.Unlock()

c.appendOperation(func(ctx context.Context) *Entry {
c.entries[entry.Job.Name] = entry
return nil
}

c.add <- entry
return entry
})
return nil
}

func (c *Cron) appendOperation(op func(ctx context.Context) *Entry) {
c.operationsMutex.Lock()
defer c.operationsMutex.Unlock()

c.pendingOperations = append(c.pendingOperations, op)
}

// Entries returns a snapshot of the cron entries.
func (c *Cron) Entries() []*Entry {
if c.running {
c.entriesMutex.RLock()
defer c.entriesMutex.RUnlock()

c.snapshot <- nil
x := <-c.snapshot
return x
}
return c.entrySnapshot()

c.operationsMutex.RLock()
defer c.operationsMutex.RUnlock()
entries := []*Entry{}
for _, e := range c.entries {
entries = append(entries, &Entry{
Schedule: e.Schedule,
Next: e.Next,
Prev: e.Prev,
Job: e.Job,
})
}
return entries
}

// Start the cron scheduler in its own go-routine.
func (c *Cron) Start(ctx context.Context) {
c.running = true
go c.run(ctx)
ctxWithCancel, cancel := context.WithCancel(ctx)
c.cancel = cancel
go c.run(ctxWithCancel)
}

// Run the scheduler.. this is private just due to the need to synchronize
Expand All @@ -336,19 +348,28 @@ func (c *Cron) run(ctx context.Context) {
// Figure out the next activation times for each entry.
now := time.Now().Local()

changed := true
changed := make(chan bool)
entries := []*Entry{}
c.entriesMutex.RLock()
for _, e := range c.entries {
e.Next = e.Schedule.Next(now)
}
c.entriesMutex.RUnlock()

for {
if changed {
entries = c.entrySnapshot()
changed = false
}
go func(ctx context.Context) {
for {
hasPendingOperations := false
c.operationsMutex.RLock()
hasPendingOperations = len(c.pendingOperations) > 0
c.operationsMutex.RUnlock()

if hasPendingOperations {
changed <- true
}

select {
case <-ctx.Done():
return
case <-time.After(time.Second):
}
}
}(ctx)
sort.Sort(byTime(entries))

var effective time.Time
Expand All @@ -361,6 +382,26 @@ func (c *Cron) run(ctx context.Context) {
}

select {
case <-changed:
c.operationsMutex.Lock()
for _, op := range c.pendingOperations {
newEntry := op(ctx)
if newEntry != nil {
newEntry.Next = newEntry.Schedule.Next(now)
}
}
c.pendingOperations = []func(context.Context) *Entry{}
entries = []*Entry{}
for _, e := range c.entries {
entries = append(entries, &Entry{
Schedule: e.Schedule,
Next: e.Next,
Prev: e.Prev,
Job: e.Job,
})
}
c.operationsMutex.Unlock()

case now = <-time.After(effective.Sub(now)):
// Run every entry whose next time was this effective time.
for _, e := range entries {
Expand Down Expand Up @@ -416,24 +457,8 @@ func (c *Cron) run(ctx context.Context) {
}
continue

case newEntry := <-c.add:
now = time.Now().Local()
newEntry.Next = newEntry.Schedule.Next(now)
c.entriesMutex.Lock()
c.entries[newEntry.Job.Name] = newEntry
changed = true
c.entriesMutex.Unlock()

case name := <-c.delete:
c.entriesMutex.Lock()
delete(c.entries, name)
changed = true
c.entriesMutex.Unlock()
now = time.Now().Local()

case <-c.snapshot:
now = time.Now().Local()
c.snapshot <- c.entrySnapshot()
c.snapshot <- c.entrySnapshot(entries)

case <-c.stop:
c.runWaitingGroup.Done()
Expand All @@ -445,24 +470,21 @@ func (c *Cron) run(ctx context.Context) {
// Stop the cron scheduler.
func (c *Cron) Stop() {
c.stop <- struct{}{}
c.cancel()
c.running = false
c.runWaitingGroup.Wait()
}

// entrySnapshot returns a copy of the current cron entry list.
func (c *Cron) entrySnapshot() []*Entry {
c.entriesMutex.RLock()
defer c.entriesMutex.RUnlock()

func (c *Cron) entrySnapshot(input []*Entry) []*Entry {
entries := []*Entry{}
for _, e := range c.entries {
for _, e := range input {
entries = append(entries, &Entry{
Schedule: e.Schedule,
Next: e.Next,
Prev: e.Prev,
Job: e.Job,
})
}
sort.Sort(byTime(entries))
return entries
}
60 changes: 54 additions & 6 deletions cron_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func TestAddBeforeRunning(t *testing.T) {
// Start cron, add a job, expect it runs.
func TestAddWhileRunning(t *testing.T) {
wg := &sync.WaitGroup{}
wg.Add(1)
wg.Add(2)

cron, err := New(
WithNamespace(randomNamespace()),
Expand All @@ -118,7 +118,7 @@ func TestAddWhileRunning(t *testing.T) {
})

select {
case <-time.After(ONE_SECOND):
case <-time.After(2 * ONE_SECOND):
t.FailNow()
case <-wait(wg):
}
Expand All @@ -127,7 +127,7 @@ func TestAddWhileRunning(t *testing.T) {
// Test timing with Entries.
func TestSnapshotEntries(t *testing.T) {
wg := &sync.WaitGroup{}
wg.Add(1)
wg.Add(2)

cron, err := New(
WithNamespace(randomNamespace()),
Expand All @@ -145,15 +145,63 @@ func TestSnapshotEntries(t *testing.T) {
cron.Start(context.Background())
defer cron.Stop()

// Cron should fire in 2 seconds. After 1 second, call Entries.
// After 1 second, call Entries.
select {
case <-time.After(ONE_SECOND):
cron.Entries()
}

// Even though Entries was called, the cron should fire at the 2 second mark.
// Even though Entries was called, the cron should fire twice within 3 seconds (1 + 3).
select {
case <-time.After(ONE_SECOND):
case <-time.After(3 * ONE_SECOND):
t.FailNow()
case <-wait(wg):
}
}

// Test delayed add after un starts for a while.
func TestDelayedAdd(t *testing.T) {
wg := &sync.WaitGroup{}
wg.Add(1)
called := false

cron, err := New(
WithNamespace(randomNamespace()),
WithTriggerFunc(func(ctx context.Context, s string, p *anypb.Any) error {
if s == "noop" {
return nil
}
if called {
t.Fatal("cannot call twice")
}
called = true
wg.Done()
return nil
}))
if err != nil {
t.Fatal("unexpected error")
}

cron.AddJob(Job{
Name: "test-noop",
Rhythm: "@every 1s",
Type: "noop",
})

cron.Start(context.Background())
defer cron.Stop()

// Artificial delay before add another record.
time.Sleep(10 * time.Second)

cron.AddJob(Job{
Name: "test-ev-2s",
Rhythm: "@every 2s",
})

// Event should be called only once within 2 seconds.
select {
case <-time.After(3 * ONE_SECOND):
t.FailNow()
case <-wait(wg):
}
Expand Down
Loading

0 comments on commit 3e3bda4

Please sign in to comment.