Skip to content

Commit

Permalink
Support "Dapr" format for schedule. (#8)
Browse files Browse the repository at this point in the history
* Support "Dapr" format for schedule.

Signed-off-by: Artur Souza <[email protected]>

* Remove extra go routines for error handling callback.

Signed-off-by: Artur Souza <[email protected]>

* Use require.NoError() in cron_test

Signed-off-by: Artur Souza <[email protected]>

* Assert error on AddJob for cron_test.

Signed-off-by: Artur Souza <[email protected]>

* Use Eventually

Signed-off-by: Artur Souza <[email protected]>

* Extra struct for calendar constructor.

Signed-off-by: Artur Souza <[email protected]>

---------

Signed-off-by: Artur Souza <[email protected]>
  • Loading branch information
artursouza authored Apr 3, 2024
1 parent c0f44b7 commit eab9ba8
Show file tree
Hide file tree
Showing 12 changed files with 998 additions and 164 deletions.
136 changes: 121 additions & 15 deletions counting/counting_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (

const defaultEtcdEndpoint = "127.0.0.1:2379"

func TestCounterTTL(t *testing.T) {
func TestCounterIncrement(t *testing.T) {
ctx := context.TODO()
organizer := partitioning.NewOrganizer(randomNamespace(), partitioning.NoPartitioning())
etcdClient, err := etcdclientv3.New(etcdclientv3.Config{
Expand All @@ -29,44 +29,150 @@ func TestCounterTTL(t *testing.T) {

key := organizer.CounterPath(0, "count")
// This counter will expire keys 1s after their next scheduled trigger.
counter := NewEtcdCounter(etcdClient, key, time.Second)
counter := NewEtcdCounter(etcdClient, key, 0, time.Duration(0))

value, updated, err := counter.Add(ctx, 1, time.Now().Add(time.Second))
value, updated, err := counter.Increment(ctx, 1)
require.NoError(t, err)
assert.True(t, updated)
assert.Equal(t, 1, value)

value, updated, err = counter.Add(ctx, 2, time.Now().Add(time.Second))
value, updated, err = counter.Increment(ctx, 2)
require.NoError(t, err)
assert.True(t, updated)
assert.Equal(t, 3, value)

value, updated, err = counter.Increment(ctx, -4)
require.NoError(t, err)
assert.True(t, updated)
assert.Equal(t, -1, value)

// Deletes in the database.
err = counter.Delete(ctx)
assert.NoError(t, err)

// Counter deleted but the in-memory value continues.
// Even if key is deleted in the db, a new operation will set it again.
value, updated, err = counter.Increment(ctx, 0)
require.NoError(t, err)
assert.True(t, updated)
assert.Equal(t, -1, value)

// Counter is deleted in db again.
err = counter.Delete(ctx)
assert.NoError(t, err)

// A new instance will start from 0 since the db record does not exist.
counter = NewEtcdCounter(etcdClient, key, 0, time.Duration(0))
value, updated, err = counter.Increment(ctx, 0)
require.NoError(t, err)
assert.True(t, updated)
assert.Equal(t, 0, value)
}

func TestCounterDecrement(t *testing.T) {
ctx := context.TODO()
organizer := partitioning.NewOrganizer(randomNamespace(), partitioning.NoPartitioning())
etcdClient, err := etcdclientv3.New(etcdclientv3.Config{
Endpoints: []string{defaultEtcdEndpoint},
})
require.NoError(t, err)

key := organizer.CounterPath(0, "count")
// This counter will expire keys 1s after their next scheduled trigger.
counter := NewEtcdCounter(etcdClient, key, 5, time.Duration(0))

value, updated, err := counter.Increment(ctx, -1)
require.NoError(t, err)
assert.True(t, updated)
assert.Equal(t, 4, value)

value, updated, err = counter.Increment(ctx, -2)
require.NoError(t, err)
assert.True(t, updated)
assert.Equal(t, 2, value)

time.Sleep(time.Second)

value, updated, err = counter.Add(ctx, -4, time.Now().Add(time.Second))
value, updated, err = counter.Increment(ctx, -3)
require.NoError(t, err)
assert.True(t, updated)
assert.Equal(t, -1, value)

// Counter expires 1 second after the next scheduled trigger (in this test's config)
time.Sleep(3 * time.Second)
// Deletes db record.
err = counter.Delete(ctx)
assert.NoError(t, err)

// Counter should have expired but the in-memory value continues.
// Even if key is expired in the db, a new operation will set it again, with a new TTL.
value, updated, err = counter.Add(ctx, 0, time.Now().Add(time.Second))
// Counter is deleted in db but the in-memory value continues.
// Even if key is deleted in the db, a new operation will set it again.
value, updated, err = counter.Increment(ctx, 0)
require.NoError(t, err)
assert.True(t, updated)
assert.Equal(t, -1, value)

// Counter expires 1 second after the next scheduled trigger.
// Deletes db record again.
err = counter.Delete(ctx)
assert.NoError(t, err)

// A new instance will start from initialValue since the db record is deleted.
counter = NewEtcdCounter(etcdClient, key, 5, time.Duration(0))
value, updated, err = counter.Increment(ctx, 0)
require.NoError(t, err)
assert.True(t, updated)
assert.Equal(t, 5, value)
}

func TestCounterExpiration(t *testing.T) {
ctx := context.TODO()
organizer := partitioning.NewOrganizer(randomNamespace(), partitioning.NoPartitioning())
etcdClient, err := etcdclientv3.New(etcdclientv3.Config{
Endpoints: []string{defaultEtcdEndpoint},
})
require.NoError(t, err)

key := organizer.CounterPath(0, "count")
// This counter will expire keys 1s after their next scheduled trigger.
counter := NewEtcdCounter(etcdClient, key, 0, 2*time.Second)

value, updated, err := counter.Increment(ctx, 1)
require.NoError(t, err)
assert.True(t, updated)
assert.Equal(t, 1, value)

// Enough time to expire in the database.
time.Sleep(3 * time.Second)

// New instance to make sure we re-read it from DB.
counter = NewEtcdCounter(etcdClient, key, 0, time.Duration(0))

value, updated, err = counter.Increment(ctx, 2)
require.NoError(t, err)
assert.True(t, updated)
assert.Equal(t, 2, value)

// Zero duration means it never expires.
// Enough time to make sure the previous TTL did not apply anymore.
time.Sleep(3 * time.Second)

// A new instance will start from 0 since the db record is expired.
counter = NewEtcdCounter(etcdClient, key, time.Second)
value, updated, err = counter.Add(ctx, 0, time.Now().Add(time.Second))
value, updated, err = counter.Increment(ctx, 3)
require.NoError(t, err)
assert.True(t, updated)
assert.Equal(t, 0, value)
assert.Equal(t, 5, value)
}

func TestCounterDeleteUnknownKey(t *testing.T) {
ctx := context.TODO()
organizer := partitioning.NewOrganizer(randomNamespace(), partitioning.NoPartitioning())
etcdClient, err := etcdclientv3.New(etcdclientv3.Config{
Endpoints: []string{defaultEtcdEndpoint},
})
require.NoError(t, err)

key := organizer.CounterPath(0, "unknown")
// This counter will expire keys 1s after their next scheduled trigger.
counter := NewEtcdCounter(etcdClient, key, 0, 2*time.Second)

err = counter.Delete(ctx)
assert.NoError(t, err)
}

func randomNamespace() string {
Expand Down
52 changes: 34 additions & 18 deletions counting/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,42 +16,48 @@ import (

type Counter interface {
// Applies by the given delta (+ or -) and return the updated value.
// Count has a ttl calculated using the next tick's time.
// Returns (updated value, true if value was updated in memory, err if any error happened)
// It is possible that the value is updated but an error occurred while trying to persist it.
Add(context.Context, int, time.Time) (int, bool, error)
Increment(context.Context, int) (int, bool, error)

Delete(context.Context) error
}

// It keeps a cache of the value and updates async.
// It works assuming there cannot be two concurrent writes to the same key.
// Concurrency is handled at the job level, which makes this work.
type etcdcounter struct {
etcdclient *etcdclientv3.Client
key string
etcdclient *etcdclientv3.Client
key string
initialValue int

loaded bool
value int
ttlOffset time.Duration
loaded bool
value int
ttl time.Duration
}

func NewEtcdCounter(c *etcdclientv3.Client, key string, ttlOffset time.Duration) Counter {
func NewEtcdCounter(c *etcdclientv3.Client, key string, initialValue int, ttl time.Duration) Counter {
return &etcdcounter{
etcdclient: c,
key: key,
ttlOffset: ttlOffset,
etcdclient: c,
initialValue: initialValue,
key: key,
ttl: ttl,
}
}

func (c *etcdcounter) Add(ctx context.Context, delta int, next time.Time) (int, bool, error) {
func (c *etcdcounter) Increment(ctx context.Context, delta int) (int, bool, error) {
firstWrite := false

if !c.loaded {
// First, load the key's value.
res, err := c.etcdclient.KV.Get(ctx, c.key)
if err != nil {
return 0, false, err
}
if len(res.Kvs) == 0 {
c.value = 0
c.value = c.initialValue
c.loaded = true
firstWrite = true // No value for key, this is first write.
} else {
if res.Kvs[0].Value == nil {
return 0, false, fmt.Errorf("nil value for key %s", c.key)
Expand All @@ -68,12 +74,22 @@ func (c *etcdcounter) Add(ctx context.Context, delta int, next time.Time) (int,
}

c.value += delta
// Create a lease
ttl := time.Until(next.Add(c.ttlOffset))
lease, err := c.etcdclient.Grant(ctx, int64(ttl.Seconds()))
if err != nil {

if firstWrite && (c.ttl > time.Duration(0)) {
// Counter will expire after some time, so first write is special in this case.
lease, err := c.etcdclient.Grant(ctx, int64(c.ttl.Seconds()))
if err != nil {
return c.value, true, err
}
_, err = c.etcdclient.KV.Put(ctx, c.key, strconv.Itoa(c.value), etcdclientv3.WithLease(lease.ID))
return c.value, true, err
}
_, err = c.etcdclient.KV.Put(ctx, c.key, strconv.Itoa(c.value), etcdclientv3.WithLease(lease.ID))

_, err := c.etcdclient.KV.Put(ctx, c.key, strconv.Itoa(c.value))
return c.value, true, err
}

func (c *etcdcounter) Delete(ctx context.Context) error {
_, err := c.etcdclient.KV.Delete(ctx, c.key)
return err
}
62 changes: 41 additions & 21 deletions cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,7 @@ func New(opts ...CronOpt) (*Cron, error) {
return cron.scheduleJob(jobFromJobRecord(r))
},
func(ctx context.Context, s string) error {
cron.killJob(s)
return nil
return cron.onJobDeleted(ctx, s)
})
}

Expand All @@ -241,6 +240,20 @@ func (c *Cron) DeleteJob(ctx context.Context, jobName string) error {
return c.jobStore.Delete(ctx, jobName)
}

func (c *Cron) onJobDeleted(ctx context.Context, jobName string) error {
c.killJob(jobName)
// Best effort to delete the counter.
partitionId := c.partitioning.CalculatePartitionId(jobName)
counterKey := c.organizer.CounterPath(partitionId, jobName)
counter := counting.NewEtcdCounter(c.etcdclient, counterKey, 0, time.Duration(0))
err := counter.Delete(ctx)
if err != nil {
c.errorsHandler(ctx, Job{Name: jobName}, err)
}
// Ignore error as it is a best effort.
return nil
}

func (c *Cron) killJob(name string) {
c.appendOperation(func(ctx context.Context) *Entry {
_, ok := c.entries[name]
Expand Down Expand Up @@ -268,11 +281,19 @@ func (c *Cron) GetJob(jobName string) *Job {

// Schedule adds a Job to the Cron to be run on the given schedule.
func (c *Cron) scheduleJob(job *Job) error {
s, err := rhythm.Parse(job.Rhythm)
s, repeats, err := rhythm.Parse(job.Rhythm)
if err != nil {
return err
}

if (repeats > 0) && (job.Repeats > 0) && (job.Repeats != int32(repeats)) {
return fmt.Errorf("conflicting number of repeats: %v vs %v", repeats, job.Repeats)
}

if repeats > 0 {
job.Repeats = int32(repeats)
}

return c.schedule(s, job)
}

Expand All @@ -287,9 +308,9 @@ func (c *Cron) schedule(schedule rhythm.Schedule, job *Job) error {
var counter counting.Counter
if job.Repeats > 0 {
counterKey := c.organizer.CounterPath(partitionId, job.Name)
// Needs to count the number of invocations
// TODO(artursouza): get ttl param for counter instead of hardcode it.
counter = counting.NewEtcdCounter(c.etcdclient, counterKey, 48*time.Hour)
// Needs to count the number of invocations.
// Follows the job's TTL (if set).
counter = counting.NewEtcdCounter(c.etcdclient, counterKey, int(job.Repeats), job.TTL)
}

entry := &Entry{
Expand Down Expand Up @@ -421,7 +442,7 @@ func (c *Cron) run(ctx context.Context) {
tickLock := e.distMutexPrefix + fmt.Sprintf("%v", effective.Unix())
m, err := mutexStore.Get(tickLock)
if err != nil {
go c.etcdErrorsHandler(ctx, e.Job, errors.Wrapf(err, "fail to create etcd mutex for job '%v'", e.Job.Name))
c.etcdErrorsHandler(ctx, e.Job, errors.Wrapf(err, "fail to create etcd mutex for job '%v'", e.Job.Name))
return
}

Expand All @@ -436,13 +457,13 @@ func (c *Cron) run(ctx context.Context) {
if err == context.DeadlineExceeded {
return
} else if err != nil {
go c.etcdErrorsHandler(ctx, e.Job, errors.Wrapf(err, "fail to lock mutex '%v'", m.Key()))
c.etcdErrorsHandler(ctx, e.Job, errors.Wrapf(err, "fail to lock mutex '%v'", m.Key()))
return
}

result, err := c.triggerFunc(ctx, e.Job.Metadata, e.Job.Payload)
if err != nil {
go c.errorsHandler(ctx, e.Job, err)
c.errorsHandler(ctx, e.Job, err)
return
}

Expand All @@ -452,33 +473,32 @@ func (c *Cron) run(ctx context.Context) {
// One example, is having a more efficient way to count number of invocations.
err = c.DeleteJob(ctx, e.Job.Name)
if err != nil {
go c.errorsHandler(ctx, e.Job, err)
c.errorsHandler(ctx, e.Job, err)
}

// No need to check (and delete) a counter since every counter has a TTL.
return
}

increment := 1
if result == Failure {
// Does not increment counter, but refreshes the TTL.
increment = 0
}

if e.counter != nil {
if result == OK && e.counter != nil {
// Needs to check number of triggers
count, updated, err := e.counter.Add(ctx, increment, next)
remaining, updated, err := e.counter.Increment(ctx, -1)
if err != nil {
go c.errorsHandler(ctx, e.Job, err)
c.errorsHandler(ctx, e.Job, err)
// No need to abort if updating the count failed.
// The count solution is not transactional anyway.
}

if updated {
if count >= int(e.Job.Repeats) {
if remaining <= 0 {
err = c.DeleteJob(ctx, e.Job.Name)
if err != nil {
go c.errorsHandler(ctx, e.Job, err)
c.errorsHandler(ctx, e.Job, err)
}
// Tries to delete the counter here
err = e.counter.Delete(ctx)
if err != nil {
c.errorsHandler(ctx, e.Job, err)
}
}
}
Expand Down
Loading

0 comments on commit eab9ba8

Please sign in to comment.