Skip to content

Commit

Permalink
pass private call execution func
Browse files Browse the repository at this point in the history
  • Loading branch information
matt2e committed Apr 12, 2024
1 parent 7728f85 commit b174655
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 35 deletions.
2 changes: 1 addition & 1 deletion backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func New(ctx context.Context, db *dal.DAL, config Config, runnerScaling scaling.
increaseReplicaFailures: map[string]int{},
}

cronSvc := cronjobs.New(ctx, key, svc.config.Advertise, cronjobs.Config{Timeout: config.CronJobTimeout}, db, svc.tasks, svc)
cronSvc := cronjobs.New(ctx, key, svc.config.Advertise, cronjobs.Config{Timeout: config.CronJobTimeout}, db, svc.tasks, svc.callWithRequest)
svc.cronJobs = cronSvc
svc.controllerListListeners = append(svc.controllerListListeners, svc.tasks, cronSvc)
_, _ = svc.updateControllersList(ctx)
Expand Down
16 changes: 7 additions & 9 deletions backend/controller/cronjobs/cronjobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,7 @@ type Scheduler interface {
Parallel(retry backoff.Backoff, job scheduledtask.Job)
}

type CallExecuter interface {
CallWithRequest(ctx context.Context, req *connect.Request[ftlv1.CallRequest], requestKey optional.Option[model.RequestKey], requestSource string) (*connect.Response[ftlv1.CallResponse], error)
}
type ExecuteCallFunc func(context.Context, *connect.Request[ftlv1.CallRequest], optional.Option[model.RequestKey], string) (*connect.Response[ftlv1.CallResponse], error)

type Service struct {
config Config
Expand All @@ -79,26 +77,26 @@ type Service struct {

dal DAL
scheduler Scheduler
executor CallExecuter // Change the type from *CallExecuter to CallExecuter
call ExecuteCallFunc

clock clock.Clock
jobChanges *pubsub.Topic[jobChange]

hashRingState atomic.Value[*hashRingState]
}

func New(ctx context.Context, key model.ControllerKey, originURL *url.URL, config Config, dal DAL, scheduler Scheduler, executor CallExecuter) *Service {
return NewForTesting(ctx, key, originURL, config, dal, scheduler, executor, clock.New())
func New(ctx context.Context, key model.ControllerKey, originURL *url.URL, config Config, dal DAL, scheduler Scheduler, call ExecuteCallFunc) *Service {
return NewForTesting(ctx, key, originURL, config, dal, scheduler, call, clock.New())
}

func NewForTesting(ctx context.Context, key model.ControllerKey, originURL *url.URL, config Config, dal DAL, scheduler Scheduler, executor CallExecuter, clock clock.Clock) *Service {
func NewForTesting(ctx context.Context, key model.ControllerKey, originURL *url.URL, config Config, dal DAL, scheduler Scheduler, call ExecuteCallFunc, clock clock.Clock) *Service {
svc := &Service{
config: config,
key: key,
originURL: originURL,
dal: dal,
scheduler: scheduler,
executor: executor,
call: call,
clock: clock,
jobChanges: pubsub.New[jobChange](),
}
Expand Down Expand Up @@ -196,7 +194,7 @@ func (s *Service) executeJob(ctx context.Context, job dal.CronJob) {

callCtx, cancel := context.WithTimeout(ctx, s.config.Timeout)
defer cancel()
_, err = s.executor.CallWithRequest(callCtx, req, optional.Some(requestKey), s.originURL.Host)
_, err = s.call(callCtx, req, optional.Some(requestKey), s.originURL.Host)
if err != nil {
logger.Errorf(err, "failed to execute cron job %s", job.Ref.String())
}
Expand Down
41 changes: 16 additions & 25 deletions backend/controller/cronjobs/cronjobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func (d *mockDAL) createCronJob(deploymentKey model.DeploymentKey, module string
defer d.lock.Unlock()

job := dal.CronJob{
Key: model.NewCronJobKey(module, verb),
DeploymentKey: deploymentKey,
Ref: schema.Ref{Module: module, Name: verb},
Schedule: schedule,
Expand Down Expand Up @@ -137,22 +138,6 @@ func (s *mockScheduler) Parallel(retry backoff.Backoff, job scheduledtask.Job) {
// do nothing
}

type mockExecutor struct {
verbCallCount map[string]int
lock sync.Mutex
clock *clock.Mock
}

func (e *mockExecutor) CallWithRequest(ctx context.Context, req *connect.Request[ftlv1.CallRequest], requestKey optional.Option[model.RequestKey], requestSource string) (*connect.Response[ftlv1.CallResponse], error) {
verbRef := schema.RefFromProto(req.Msg.Verb)

e.lock.Lock()
e.verbCallCount[verbRef.Name]++
e.lock.Unlock()

return &connect.Response[ftlv1.CallResponse]{}, nil
}

type controller struct {
key model.ControllerKey
DAL DAL
Expand All @@ -173,11 +158,9 @@ func TestService(t *testing.T) {
attemptCountMap: map[string]int{},
}
scheduler := &mockScheduler{}
executor := &mockExecutor{
verbCallCount: map[string]int{},
lock: sync.Mutex{},
clock: clock,
}

verbCallCount := map[string]int{}
verbCallCountLock := sync.Mutex{}

// initial jobs
for i := range 20 {
Expand All @@ -195,9 +178,17 @@ func TestService(t *testing.T) {
for i := range 5 {
key := model.NewControllerKey("localhost", strconv.Itoa(8080+i))
controller := &controller{
key: key,
DAL: mockDal,
cronJobs: NewForTesting(ctx, key, &url.URL{Host: "test.com"}, config, mockDal, scheduler, executor, clock),
key: key,
DAL: mockDal,
cronJobs: NewForTesting(ctx, key, &url.URL{Host: "test.com"}, config, mockDal, scheduler, func(ctx context.Context, r *connect.Request[ftlv1.CallRequest], o optional.Option[model.RequestKey], s string) (*connect.Response[ftlv1.CallResponse], error) {
verbRef := schema.RefFromProto(r.Msg.Verb)

verbCallCountLock.Lock()
verbCallCount[verbRef.Name]++
verbCallCountLock.Unlock()

return &connect.Response[ftlv1.CallResponse]{}, nil
}, clock),
}
controllers = append(controllers, controller)
}
Expand All @@ -223,7 +214,7 @@ func TestService(t *testing.T) {
}

for _, j := range mockDal.jobs {
count := executor.verbCallCount[j.Ref.Name]
count := verbCallCount[j.Ref.Name]
assert.Equal(t, count, 3, "expected verb %s to be called 3 times", j.Ref.Name)
}
}

0 comments on commit b174655

Please sign in to comment.