Skip to content

Commit

Permalink
feat: keys include host-port-suffix
Browse files Browse the repository at this point in the history
  • Loading branch information
matt2e committed Mar 12, 2024
1 parent 8f4e3b4 commit 1211f7d
Show file tree
Hide file tree
Showing 19 changed files with 255 additions and 167 deletions.
8 changes: 4 additions & 4 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/jackc/pgx/v5/pgxpool"
"github.com/jellydator/ttlcache/v3"
"github.com/jpillora/backoff"
"github.com/oklog/ulid/v2"
"golang.org/x/exp/maps"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/structpb"
Expand Down Expand Up @@ -55,7 +54,7 @@ type Config struct {
ConsoleURL *url.URL `help:"The public URL of the console (for CORS)." env:"FTL_CONTROLLER_CONSOLE_URL"`
AllowOrigins []*url.URL `help:"Allow CORS requests to ingress endpoints from these origins." env:"FTL_CONTROLLER_ALLOW_ORIGIN"`
ContentTime time.Time `help:"Time to use for console resource timestamps." default:"${timestamp=1970-01-01T00:00:00Z}"`
Key model.ControllerKey `help:"Controller key (auto)." placeholder:"C<ULID>" default:"C00000000000000000000000000"`
Key model.ControllerKey `help:"Controller key (auto)."`
DSN string `help:"DAL DSN." default:"postgres://localhost:54320/ftl?sslmode=disable&user=postgres&password=secret" env:"FTL_CONTROLLER_DSN"`
RunnerTimeout time.Duration `help:"Runner heartbeat timeout." default:"10s"`
DeploymentReservationTimeout time.Duration `help:"Deployment reservation timeout." default:"120s"`
Expand Down Expand Up @@ -152,9 +151,10 @@ type Service struct {
}

func New(ctx context.Context, db *dal.DAL, config Config, runnerScaling scaling.RunnerScaling) (*Service, error) {
var zero model.ControllerKey
key := config.Key
if config.Key.ULID() == (ulid.ULID{}) {
key = model.NewControllerKey()
if config.Key == zero {
key = model.NewControllerKey(config.Bind.Hostname(), config.Bind.Port())
}
config.SetDefaults()
svc := &Service{
Expand Down
90 changes: 69 additions & 21 deletions backend/controller/dal/dal.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,14 @@ func runnerFromDB(row sql.GetRunnerRow) Runner {
if err := json.Unmarshal(row.Labels, &attrs); err != nil {
return Runner{}
}

key, err := model.ParseRunnerDBKey(string(row.RunnerKey))

Check failure on line 95 in backend/controller/dal/dal.go

View workflow job for this annotation

GitHub Actions / Lint

unnecessary conversion (unconvert)
if err != nil {
return Runner{}
}

return Runner{
Key: model.RunnerKey(row.RunnerKey),
Key: key,
Endpoint: row.Endpoint,
State: RunnerState(row.State),
Deployment: deployment,
Expand Down Expand Up @@ -308,8 +314,14 @@ func (d *DAL) GetStatus(
if err := json.Unmarshal(in.Labels, &attrs); err != nil {
return Runner{}, fmt.Errorf("invalid attributes JSON for runner %s: %w", in.RunnerKey, err)
}

key, err := model.ParseRunnerDBKey(string(in.RunnerKey))

Check failure on line 318 in backend/controller/dal/dal.go

View workflow job for this annotation

GitHub Actions / Lint

unnecessary conversion (unconvert)
if err != nil {
return Runner{}, fmt.Errorf("invalid id for runner %s: %w", in.RunnerKey, err)
}

return Runner{
Key: model.RunnerKey(in.RunnerKey),
Key: key,
Endpoint: in.Endpoint,
State: RunnerState(in.State),
Deployment: deployment,
Expand All @@ -333,9 +345,13 @@ func (d *DAL) GetStatus(
}
}),
Routes: slices.Map(routes, func(row sql.GetRoutingTableRow) Route {
key, err := model.ParseRunnerDBKey(string(row.RunnerKey))

Check failure on line 348 in backend/controller/dal/dal.go

View workflow job for this annotation

GitHub Actions / Lint

unnecessary conversion (unconvert)
if err != nil {
return Route{}
}
return Route{
Module: row.ModuleName.MustGet(),
Runner: model.RunnerKey(row.RunnerKey),
Runner: key,
Deployment: row.DeploymentName,
Endpoint: row.Endpoint,
}
Expand All @@ -354,8 +370,14 @@ func (d *DAL) GetRunnersForDeployment(ctx context.Context, deployment model.Depl
if err := json.Unmarshal(row.Labels, &attrs); err != nil {
return nil, fmt.Errorf("invalid attributes JSON for runner %d: %w", row.ID, err)
}

key, err := model.ParseRunnerDBKey(string(row.Key))

Check failure on line 374 in backend/controller/dal/dal.go

View workflow job for this annotation

GitHub Actions / Lint

unnecessary conversion (unconvert)
if err != nil {
return nil, fmt.Errorf("invalid id for runner %d: %w", row.ID, err)
}

runners = append(runners, Runner{
Key: model.RunnerKey(row.Key),
Key: key,
Endpoint: row.Endpoint,
State: RunnerState(row.State),
Deployment: optional.Some(deployment),
Expand Down Expand Up @@ -502,7 +524,7 @@ func (d *DAL) UpsertRunner(ctx context.Context, runner Runner) error {
return fmt.Errorf("%s: %w", "failed to JSON encode runner labels", err)
}
deploymentID, err := d.db.UpsertRunner(ctx, sql.UpsertRunnerParams{
Key: sql.Key(runner.Key),
Key: dbKeyFromRunnerKey(runner.Key),
Endpoint: runner.Endpoint,
State: sql.RunnerState(runner.State),
DeploymentName: pgDeploymentName,
Expand Down Expand Up @@ -534,7 +556,7 @@ func (d *DAL) KillStaleControllers(ctx context.Context, age time.Duration) (int6

// DeregisterRunner deregisters the given runner.
func (d *DAL) DeregisterRunner(ctx context.Context, key model.RunnerKey) error {
count, err := d.db.DeregisterRunner(ctx, sql.Key(key))
count, err := d.db.DeregisterRunner(ctx, dbKeyFromRunnerKey(key))
if err != nil {
return translatePGError(err)
}
Expand Down Expand Up @@ -574,11 +596,17 @@ func (d *DAL) ReserveRunnerForDeployment(ctx context.Context, deployment model.D
cancel()
return nil, fmt.Errorf("failed to JSON decode labels for runner %d: %w", runner.ID, err)
}
key, err := model.ParseRunnerDBKey(string(runner.Key))

Check failure on line 599 in backend/controller/dal/dal.go

View workflow job for this annotation

GitHub Actions / Lint

unnecessary conversion (unconvert)
if err != nil {
cancel()
return nil, fmt.Errorf("invalid id for runner %d: %w", runner.ID, err)
}

return &postgresClaim{
cancel: cancel,
tx: tx,
runner: Runner{
Key: model.RunnerKey(runner.Key),
Key: key,
Endpoint: runner.Endpoint,
State: RunnerState(runner.State),
Deployment: optional.Some(deployment),
Expand Down Expand Up @@ -628,7 +656,7 @@ func (d *DAL) SetDeploymentReplicas(ctx context.Context, key model.DeploymentNam
}

err = tx.InsertDeploymentUpdatedEvent(ctx, sql.InsertDeploymentUpdatedEventParams{
DeploymentName: key.String(),
DeploymentName: string(key),
MinReplicas: int32(minReplicas),
PrevMinReplicas: deployment.MinReplicas,
})
Expand Down Expand Up @@ -766,8 +794,12 @@ func (d *DAL) GetProcessList(ctx context.Context) ([]Process, error) {
if err := json.Unmarshal(row.RunnerLabels, &labels); err != nil {
return Process{}, fmt.Errorf("invalid labels JSON for runner %s: %w", row.RunnerKey, err)
}
key, err := model.ParseRunnerDBKey(string(row.RunnerKey.MustGet()))

Check failure on line 797 in backend/controller/dal/dal.go

View workflow job for this annotation

GitHub Actions / Lint

unnecessary conversion (unconvert)
if err != nil {
return Process{}, fmt.Errorf("invalid runner key %s: %w", row.RunnerKey, err)
}
runner = optional.Some(ProcessRunner{
Key: model.RunnerKey(row.RunnerKey.MustGet()),
Key: key,
Endpoint: endpoint,
Labels: labels,
})
Expand Down Expand Up @@ -812,8 +844,12 @@ func (d *DAL) GetIdleRunners(ctx context.Context, limit int, labels model.Labels
if err != nil {
return Runner{}, fmt.Errorf("%s: %w", "could not unmarshal labels", err)
}
key, err := model.ParseRunnerDBKey(string(row.Key))

Check failure on line 847 in backend/controller/dal/dal.go

View workflow job for this annotation

GitHub Actions / Lint

unnecessary conversion (unconvert)
if err != nil {
return Runner{}, fmt.Errorf("%s: %w", "invalid runner key", err)
}
return Runner{
Key: model.RunnerKey(row.Key),
Key: key,
Endpoint: row.Endpoint,
State: RunnerState(row.State),
Labels: labels,
Expand All @@ -835,28 +871,31 @@ func (d *DAL) GetRoutingTable(ctx context.Context, modules []string) (map[string
}
out := make(map[string][]Route, len(routes))
for _, route := range routes {
// This is guaranteed to be non-nil by the query, but sqlc doesn't quite understand that.
moduleName := route.ModuleName.MustGet()
out[moduleName] = append(out[moduleName], Route{
Module: moduleName,
Deployment: route.DeploymentName,
Runner: model.RunnerKey(route.RunnerKey),
Endpoint: route.Endpoint,
})
if runnerKey, err := model.ParseRunnerDBKey(string(route.RunnerKey)); err == nil {

// This is guaranteed to be non-nil by the query, but sqlc doesn't quite understand that.
moduleName := route.ModuleName.MustGet()
out[moduleName] = append(out[moduleName], Route{
Module: moduleName,
Deployment: route.DeploymentName,
Runner: runnerKey,
Endpoint: route.Endpoint,
})
}
}
return out, nil
}

func (d *DAL) GetRunnerState(ctx context.Context, runnerKey model.RunnerKey) (RunnerState, error) {
state, err := d.db.GetRunnerState(ctx, sql.Key(runnerKey))
state, err := d.db.GetRunnerState(ctx, dbKeyFromRunnerKey(runnerKey))
if err != nil {
return "", translatePGError(err)
}
return RunnerState(state), nil
}

func (d *DAL) GetRunner(ctx context.Context, runnerKey model.RunnerKey) (Runner, error) {
row, err := d.db.GetRunner(ctx, sql.Key(runnerKey))
row, err := d.db.GetRunner(ctx, dbKeyFromRunnerKey(runnerKey))
if err != nil {
return Runner{}, translatePGError(err)
}
Expand Down Expand Up @@ -926,8 +965,12 @@ func (d *DAL) GetIngressRoutes(ctx context.Context, method string) ([]IngressRou
return nil, ErrNotFound
}
return slices.Map(routes, func(row sql.GetIngressRoutesRow) IngressRoute {
key, err := model.ParseRunnerDBKey(string(row.RunnerKey))

Check failure on line 968 in backend/controller/dal/dal.go

View workflow job for this annotation

GitHub Actions / Lint

unnecessary conversion (unconvert)
if err != nil {
return IngressRoute{}
}
return IngressRoute{
Runner: model.RunnerKey(row.RunnerKey),
Runner: key,
Deployment: row.DeploymentName,
Endpoint: row.Endpoint,
Path: row.Path,
Expand Down Expand Up @@ -1043,3 +1086,8 @@ func translatePGError(err error) error {
}
return err
}

func dbKeyFromRunnerKey(key model.RunnerKey) string {
value, _ := key.Value()
return value.(string)
}
6 changes: 3 additions & 3 deletions backend/controller/dal/dal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func TestDAL(t *testing.T) {
assert.Equal(t, []sha256.SHA256{misshingSHA}, missing)
})

runnerID := model.NewRunnerKey()
runnerID := model.NewRunnerKey("localhost", "8080")
labels := map[string]any{"languages": []any{"go"}}

t.Run("RegisterRunner", func(t *testing.T) {
Expand All @@ -101,7 +101,7 @@ func TestDAL(t *testing.T) {

t.Run("RegisterRunnerFailsOnDuplicate", func(t *testing.T) {
err = dal.UpsertRunner(ctx, Runner{
Key: model.NewRunnerKey(),
Key: model.NewRunnerKey("localhost", "8080"),
Labels: labels,
Endpoint: "http://localhost:8080",
State: RunnerStateIdle,
Expand Down Expand Up @@ -333,7 +333,7 @@ func TestDAL(t *testing.T) {
})

t.Run("DeregisterRunnerFailsOnMissing", func(t *testing.T) {
err = dal.DeregisterRunner(ctx, model.NewRunnerKey())
err = dal.DeregisterRunner(ctx, model.NewRunnerKey("localhost", "8080"))
assert.IsError(t, err, ErrNotFound)
})
}
Expand Down
20 changes: 13 additions & 7 deletions backend/controller/scaling/localscaling/local_scaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ type LocalScaling struct {

portAllocator *bind.BindAllocator
controllerAddresses []*url.URL

prevRunnerSuffix int
}

func NewLocalScaling(portAllocator *bind.BindAllocator, controllerAddresses []*url.URL) (*LocalScaling, error) {
Expand All @@ -40,6 +42,7 @@ func NewLocalScaling(portAllocator *bind.BindAllocator, controllerAddresses []*u
runners: map[model.RunnerKey]context.CancelFunc{},
portAllocator: portAllocator,
controllerAddresses: controllerAddresses,
prevRunnerSuffix: 0, // first runner will have an id of r-0001
}, nil
}

Expand Down Expand Up @@ -72,25 +75,28 @@ func (l *LocalScaling) SetReplicas(ctx context.Context, replicas int, idleRunner

logger.Debugf("Adding %d replicas", replicasToAdd)
for i := 0; i < replicasToAdd; i++ {
i := i

controllerEndpoint := l.controllerAddresses[len(l.runners)%len(l.controllerAddresses)]

bind := l.portAllocator.Next()
keySuffix := l.prevRunnerSuffix + 1
l.prevRunnerSuffix = keySuffix

config := runner.Config{
Bind: l.portAllocator.Next(),
Bind: bind,
ControllerEndpoint: controllerEndpoint,
TemplateDir: templateDir(ctx),
Key: model.NewRunnerKey(),
Key: model.NewLocalRunnerKey(keySuffix),
}

name := fmt.Sprintf("runner%d", i)
simpleName := fmt.Sprintf("runner%d", config.Key.Suffix)
if err := kong.ApplyDefaults(&config, kong.Vars{
"deploymentdir": filepath.Join(l.cacheDir, "ftl-runner", name, "deployments"),
"deploymentdir": filepath.Join(l.cacheDir, "ftl-runner", simpleName, "deployments"),
"language": "go,kotlin",
}); err != nil {
return err
}

runnerCtx := log.ContextWithLogger(ctx, logger.Scope(name))
runnerCtx := log.ContextWithLogger(ctx, logger.Scope(simpleName))

runnerCtx, cancel := context.WithCancel(runnerCtx)
l.runners[config.Key] = cancel
Expand Down
8 changes: 4 additions & 4 deletions backend/controller/scheduledtask/scheduledtask_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ func TestCron(t *testing.T) {
}

controllers := []*controller{
{controller: dal.Controller{Key: model.NewControllerKey()}},
{controller: dal.Controller{Key: model.NewControllerKey()}},
{controller: dal.Controller{Key: model.NewControllerKey()}},
{controller: dal.Controller{Key: model.NewControllerKey()}},
{controller: dal.Controller{Key: model.NewControllerKey("localhost", "8080")}},
{controller: dal.Controller{Key: model.NewControllerKey("localhost", "8081")}},
{controller: dal.Controller{Key: model.NewControllerKey("localhost", "8082")}},
{controller: dal.Controller{Key: model.NewControllerKey("localhost", "8083")}},
}

clock := clock.NewMock()
Expand Down
2 changes: 1 addition & 1 deletion backend/controller/sql/models.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions backend/controller/sql/querier.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 1211f7d

Please sign in to comment.