Skip to content

Commit

Permalink
Builtin TTL for Job and Counter. (#9)
Browse files Browse the repository at this point in the history
* Support "Dapr" format for schedule.

Signed-off-by: Artur Souza <[email protected]>

* Remove extra go routines for error handling callback.

Signed-off-by: Artur Souza <[email protected]>

* Use require.NoError() in cron_test

Signed-off-by: Artur Souza <[email protected]>

* Assert error on AddJob for cron_test.

Signed-off-by: Artur Souza <[email protected]>

* Use Eventually

Signed-off-by: Artur Souza <[email protected]>

* Extra struct for calendar constructor.

Signed-off-by: Artur Souza <[email protected]>

* Built-in TTL - not using etcd.

Signed-off-by: Artur Souza <[email protected]>

* Update proto/storage/job.proto

Co-authored-by: Cassie Coyle <[email protected]>
Signed-off-by: Artur Souza <[email protected]>

* Fix conflict bug.

Signed-off-by: Artur Souza <[email protected]>

* Fix some other stuff for expiration + new test

Signed-off-by: Artur Souza <[email protected]>

---------

Signed-off-by: Artur Souza <[email protected]>
Co-authored-by: Cassie Coyle <[email protected]>
  • Loading branch information
artursouza and cicoyle authored Apr 4, 2024
1 parent eab9ba8 commit 4d08c37
Show file tree
Hide file tree
Showing 11 changed files with 241 additions and 189 deletions.
8 changes: 4 additions & 4 deletions .examples/cron_example.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,10 @@ func main() {
Payload: &anypb.Any{Value: []byte("ev 7s")},
})
cron.AddJob(ctx, etcdcron.Job{
Name: "every-1s-then-expire-hadfh452erhh",
Rhythm: "*/1 * * * * *",
TTL: 10,
Payload: &anypb.Any{Value: []byte("ev 1s then expires after 10s")},
Name: "every-1s-then-expire-hadfh452erhh",
Rhythm: "*/1 * * * * *",
Expiration: time.Now().Add(10 * time.Second),
Payload: &anypb.Any{Value: []byte("ev 1s then expires after 10s")},
})
}
cron.Start(ctx)
Expand Down
8 changes: 4 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ modtidy:
# Target: gen-proto #
################################################################################
PROTOC ?=protoc
PROTOC_VERSION = 3.21.12
PROTOBUF_SUITE_VERSION = 21.12
PROTOC_GEN_GO_VERSION = v1.28.1
PROTOC_VERSION = 24.4
PROTOBUF_SUITE_VERSION = 24.4
PROTOC_GEN_GO_VERSION = v1.32.0

PROTOC_GEN_GO_GRPC_VERSION = 1.2.0
PROTOC_GEN_GO_GRPC_VERSION = 1.3.0

PROTOS:=$(shell ls proto)
PROTO_PREFIX:=github.com/diagridio/go-etcd-cron
Expand Down
8 changes: 7 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,13 @@ cron.AddJob(context.TODO(), Job{
Pre-requisites to run the tests locally:
- Run etcd locally via one of the options below:
- Locally: [Install etcd](https://etcd.io/docs/v3.4/install/) then run `etcd --logger=zap`
- Docker: [Running a single node etcd](https://etcd.io/docs/v3.5/op-guide/container/#running-a-single-node-etcd-1)
- Docker: [Running a single node etcd](https://etcd.io/docs/v3.5/op-guide/container/#running-a-single-node-etcd-1), for example:
```
docker run -d -p 2379:2379 \
-e ETCD_ADVERTISE_CLIENT_URLS=http://0.0.0.0:2379 \
-e ETCD_LISTEN_CLIENT_URLS=http://0.0.0.0:2379 \
--name etcd quay.io/coreos/etcd:v3.5.5
```
```bash
make test
Expand Down
48 changes: 5 additions & 43 deletions counting/counting_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func TestCounterIncrement(t *testing.T) {

key := organizer.CounterPath(0, "count")
// This counter will expire keys 1s after their next scheduled trigger.
counter := NewEtcdCounter(etcdClient, key, 0, time.Duration(0))
counter := NewEtcdCounter(etcdClient, key, 0)

value, updated, err := counter.Increment(ctx, 1)
require.NoError(t, err)
Expand Down Expand Up @@ -62,7 +62,7 @@ func TestCounterIncrement(t *testing.T) {
assert.NoError(t, err)

// A new instance will start from 0 since the db record does not exist.
counter = NewEtcdCounter(etcdClient, key, 0, time.Duration(0))
counter = NewEtcdCounter(etcdClient, key, 0)
value, updated, err = counter.Increment(ctx, 0)
require.NoError(t, err)
assert.True(t, updated)
Expand All @@ -79,7 +79,7 @@ func TestCounterDecrement(t *testing.T) {

key := organizer.CounterPath(0, "count")
// This counter will expire keys 1s after their next scheduled trigger.
counter := NewEtcdCounter(etcdClient, key, 5, time.Duration(0))
counter := NewEtcdCounter(etcdClient, key, 5)

value, updated, err := counter.Increment(ctx, -1)
require.NoError(t, err)
Expand Down Expand Up @@ -114,46 +114,9 @@ func TestCounterDecrement(t *testing.T) {
assert.NoError(t, err)

// A new instance will start from initialValue since the db record is deleted.
counter = NewEtcdCounter(etcdClient, key, 5, time.Duration(0))
counter = NewEtcdCounter(etcdClient, key, 5)
value, updated, err = counter.Increment(ctx, 0)
require.NoError(t, err)
assert.True(t, updated)
assert.Equal(t, 5, value)
}

func TestCounterExpiration(t *testing.T) {
ctx := context.TODO()
organizer := partitioning.NewOrganizer(randomNamespace(), partitioning.NoPartitioning())
etcdClient, err := etcdclientv3.New(etcdclientv3.Config{
Endpoints: []string{defaultEtcdEndpoint},
})
require.NoError(t, err)

key := organizer.CounterPath(0, "count")
// This counter will expire keys 1s after their next scheduled trigger.
counter := NewEtcdCounter(etcdClient, key, 0, 2*time.Second)

value, updated, err := counter.Increment(ctx, 1)
require.NoError(t, err)
assert.True(t, updated)
assert.Equal(t, 1, value)

// Enough time to expire in the database.
time.Sleep(3 * time.Second)

// New instance to make sure we re-read it from DB.
counter = NewEtcdCounter(etcdClient, key, 0, time.Duration(0))

value, updated, err = counter.Increment(ctx, 2)
require.NoError(t, err)
assert.True(t, updated)
assert.Equal(t, 2, value)

// Zero duration means it never expires.
// Enough time to make sure the previous TTL did not apply anymore.
time.Sleep(3 * time.Second)

value, updated, err = counter.Increment(ctx, 3)
require.NoError(t, err)
assert.True(t, updated)
assert.Equal(t, 5, value)
Expand All @@ -168,8 +131,7 @@ func TestCounterDeleteUnknownKey(t *testing.T) {
require.NoError(t, err)

key := organizer.CounterPath(0, "unknown")
// This counter will expire keys 1s after their next scheduled trigger.
counter := NewEtcdCounter(etcdClient, key, 0, 2*time.Second)
counter := NewEtcdCounter(etcdClient, key, 0)

err = counter.Delete(ctx)
assert.NoError(t, err)
Expand Down
70 changes: 35 additions & 35 deletions counting/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"context"
"fmt"
"strconv"
"time"

etcdclientv3 "go.etcd.io/etcd/client/v3"
)
Expand All @@ -19,7 +18,7 @@ type Counter interface {
// Returns (updated value, true if value was updated in memory, err if any error happened)
// It is possible that the value is updated but an error occurred while trying to persist it.
Increment(context.Context, int) (int, bool, error)

Refresh(context.Context) (int, error)
Delete(context.Context) error
}

Expand All @@ -33,58 +32,34 @@ type etcdcounter struct {

loaded bool
value int
ttl time.Duration
}

func NewEtcdCounter(c *etcdclientv3.Client, key string, initialValue int, ttl time.Duration) Counter {
func NewEtcdCounter(c *etcdclientv3.Client, key string, initialValue int) Counter {
return &etcdcounter{
etcdclient: c,
initialValue: initialValue,
key: key,
ttl: ttl,
value: initialValue,
}
}

func (c *etcdcounter) Increment(ctx context.Context, delta int) (int, bool, error) {
firstWrite := false

if !c.loaded {
// First, load the key's value.
res, err := c.etcdclient.KV.Get(ctx, c.key)
_, err := c.Refresh(ctx)
if err != nil {
return 0, false, err
return c.value, false, err
}
if len(res.Kvs) == 0 {
c.value = c.initialValue
c.loaded = true
firstWrite = true // No value for key, this is first write.
} else {
if res.Kvs[0].Value == nil {
return 0, false, fmt.Errorf("nil value for key %s", c.key)
}
if len(res.Kvs[0].Value) == 0 {
return 0, false, fmt.Errorf("empty value for key %s", c.key)
}
c.loaded = true
}

c.value, err = strconv.Atoi(string(res.Kvs[0].Value))
if err != nil {
return 0, false, err
}
}
if delta == 0 {
// No need to do a db write for a no-change operation.
return c.value, true, nil
}

c.value += delta

if firstWrite && (c.ttl > time.Duration(0)) {
// Counter will expire after some time, so first write is special in this case.
lease, err := c.etcdclient.Grant(ctx, int64(c.ttl.Seconds()))
if err != nil {
return c.value, true, err
}
_, err = c.etcdclient.KV.Put(ctx, c.key, strconv.Itoa(c.value), etcdclientv3.WithLease(lease.ID))
return c.value, true, err
}

_, err := c.etcdclient.KV.Put(ctx, c.key, strconv.Itoa(c.value))
return c.value, true, err
}
Expand All @@ -93,3 +68,28 @@ func (c *etcdcounter) Delete(ctx context.Context) error {
_, err := c.etcdclient.KV.Delete(ctx, c.key)
return err
}

func (c *etcdcounter) Refresh(ctx context.Context) (int, error) {
c.loaded = false
res, err := c.etcdclient.KV.Get(ctx, c.key)
if err != nil {
return c.value, err
}
if len(res.Kvs) == 0 {
c.value = c.initialValue
c.loaded = true
return c.value, nil
}

if res.Kvs[0].Value == nil {
return c.value, fmt.Errorf("nil value for key %s", c.key)
}
if len(res.Kvs[0].Value) == 0 {
return c.value, fmt.Errorf("empty value for key %s", c.key)
}
v, err := strconv.Atoi(string(res.Kvs[0].Value))
if err == nil {
c.value = v
}
return c.value, err
}
Loading

0 comments on commit 4d08c37

Please sign in to comment.