Skip to content

Commit

Permalink
fix merge conflicts
Browse files Browse the repository at this point in the history
Signed-off-by: Cassandra Coyle <[email protected]>
  • Loading branch information
cicoyle committed Dec 6, 2024
2 parents fd16ff3 + a01f2ae commit 43adc3b
Show file tree
Hide file tree
Showing 13 changed files with 458 additions and 140 deletions.
1 change: 1 addition & 0 deletions cron/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ func (c *cron) Run(ctx context.Context) error {
c.lock.Lock()
close(c.readyCh)
c.lock.Unlock()

err = engine.Run(leadershipCtx)

c.lock.Lock()
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ require (
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang-jwt/jwt/v4 v4.5.0 // indirect
github.com/golang-jwt/jwt/v4 v4.5.1 // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/google/btree v1.1.3 // indirect
github.com/google/uuid v1.6.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/me
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang-jwt/jwt/v4 v4.5.0 h1:7cYmW1XlMY7h7ii7UhUyChSgS5wUJEnm9uZVTGqOWzg=
github.com/golang-jwt/jwt/v4 v4.5.0/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0=
github.com/golang-jwt/jwt/v4 v4.5.1 h1:JdqV9zKUdtaa9gdPlywC3aeoEsR681PlKC+4F5gQgeo=
github.com/golang-jwt/jwt/v4 v4.5.1/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
Expand Down
3 changes: 2 additions & 1 deletion internal/api/serve_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
/*v3Copyright (c) 2024 Diagrid Inc.
/*
Copyright (c) 2024 Diagrid Inc.
Licensed under the MIT License.
*/

Expand Down
49 changes: 18 additions & 31 deletions internal/counter/counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,23 +88,24 @@ func New(ctx context.Context, opts Options) (Interface, bool, error) {
return nil, false, err
}

c := &counter{
name: opts.Name,
counterKey: counterKey,
jobKey: jobKey,
client: opts.Client,
schedule: opts.Schedule,
job: opts.Job,
yard: opts.Yard,
collector: opts.Collector,
triggerRequest: &api.TriggerRequest{
Name: opts.Name,
Metadata: opts.Job.GetJob().GetMetadata(),
Payload: opts.Job.GetJob().GetPayload(),
},
}

if res.Count == 0 {
c := &counter{
name: opts.Name,
jobKey: jobKey,
counterKey: counterKey,
client: opts.Client,
schedule: opts.Schedule,
job: opts.Job,
count: &stored.Counter{JobPartitionId: opts.Job.GetPartitionId()},
yard: opts.Yard,
collector: opts.Collector,
triggerRequest: &api.TriggerRequest{
Name: opts.Name,
Metadata: opts.Job.GetJob().GetMetadata(),
Payload: opts.Job.GetJob().GetPayload(),
},
}
c.count = &stored.Counter{JobPartitionId: opts.Job.GetPartitionId()}

if ok, err := c.tickNext(); err != nil || !ok {
return nil, false, err
Expand All @@ -131,21 +132,7 @@ func New(ctx context.Context, opts Options) (Interface, bool, error) {
}
}

c := &counter{
counterKey: counterKey,
jobKey: jobKey,
client: opts.Client,
schedule: opts.Schedule,
job: opts.Job,
count: count,
yard: opts.Yard,
collector: opts.Collector,
triggerRequest: &api.TriggerRequest{
Name: opts.Name,
Metadata: opts.Job.GetJob().GetMetadata(),
Payload: opts.Job.GetJob().GetPayload(),
},
}
c.count = count

if ok, err := c.tickNext(); err != nil || !ok {
return nil, false, err
Expand Down
2 changes: 2 additions & 0 deletions internal/counter/counter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ func Test_New(t *testing.T) {
assert.True(t, ok)
assert.NotNil(t, c)

assert.Equal(t, "1", c.JobName())

cancel()
select {
case err := <-errCh:
Expand Down
2 changes: 1 addition & 1 deletion internal/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"time"

"github.com/dapr/kit/concurrency"
"github.com/diagridio/go-etcd-cron/internal/leadership"
"github.com/go-logr/logr"
"k8s.io/utils/clock"

Expand All @@ -28,6 +27,7 @@ import (
"github.com/diagridio/go-etcd-cron/internal/partitioner"
"github.com/diagridio/go-etcd-cron/internal/queue"
"github.com/diagridio/go-etcd-cron/internal/scheduler"
"github.com/diagridio/go-etcd-cron/internal/leadership"
)

// Options are the options for creating a new engine instance.
Expand Down
13 changes: 7 additions & 6 deletions internal/leadership/leadership.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@ import (

"github.com/dapr/kit/concurrency"
"github.com/dapr/kit/events/batcher"
"github.com/diagridio/go-etcd-cron/internal/api/stored"

"github.com/go-logr/logr"
clientv3 "go.etcd.io/etcd/client/v3"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"

"github.com/diagridio/go-etcd-cron/internal/client"
"github.com/diagridio/go-etcd-cron/internal/key"
"github.com/diagridio/go-etcd-cron/internal/api/stored"
)

// Options are the options for the Leadership.
Expand All @@ -49,11 +50,11 @@ type Options struct {
// partition, as well as ensuring that there are no other active partitions
// which are acting on a different partition total.
type Leadership struct {
log logr.Logger
client client.Interface
log logr.Logger
client client.Interface
batcher *batcher.Batcher[int, struct{}]
lock sync.RWMutex
wg sync.WaitGroup
lock sync.RWMutex
wg sync.WaitGroup

partitionTotal uint32
key *key.Key
Expand Down Expand Up @@ -218,9 +219,9 @@ func (l *Leadership) loop(ctx context.Context) error {

l.lock.Lock()
defer l.lock.Unlock()

l.readyCh = make(chan struct{})
close(l.changeCh)
l.batcher.Batch(0, struct{}{}) // notify subscribers of change
l.changeCh = make(chan struct{})

return ctx.Err()
Expand Down
Loading

0 comments on commit 43adc3b

Please sign in to comment.