Skip to content

Commit

Permalink
chore: generalise how keys are represented (#1220)
Browse files Browse the repository at this point in the history
Add a required prefix to all keys, a common mechanism for parsing into
structured data, and a way for keys to specify their desired amount of
randomness.
  • Loading branch information
alecthomas authored Apr 10, 2024
1 parent 323fed6 commit 3b62f1b
Show file tree
Hide file tree
Showing 42 changed files with 501 additions and 475 deletions.
12 changes: 6 additions & 6 deletions Justfile
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ init-db:
dbmate create
dbmate --migrations-dir backend/controller/sql/schema up

# Regenerate SQLC code (requires init-db to be run first)
build-sqlc:
@mk backend/controller/sql/{db.go,models.go,querier.go,queries.sql.go} : backend/controller/sql/queries.sql backend/controller/sql/schema -- sqlc generate
# Regenerate SQLC code
build-sqlc: init-db
@mk backend/controller/sql/{db.go,models.go,querier.go,queries.sql.go} : backend/controller/sql/queries.sql backend/controller/sql/schema sqlc.yaml -- sqlc generate

# Build the ZIP files that are embedded in the FTL release binaries
build-zips: build-kt-runtime
Expand All @@ -65,7 +65,7 @@ build-frontend: npm-install

# Rebuild VSCode extension
build-extension: npm-install
@mk {{EXTENSION_OUT}} : extensions/vscode/src -- "cd extensions/vscode && npm run compile"
@mk {{EXTENSION_OUT}} : extensions/vscode/src -- "cd extensions/vscode && npm run compile"

package-extension: build-extension
@cd extensions/vscode && vsce package
Expand Down Expand Up @@ -97,5 +97,5 @@ integration-tests *test:
go test -fullpath -count 1 -v -tags integration -run "$testName" ./integration

# Run README doc tests
test-readme:
mdcode run README.md -- bash test.sh
test-readme *args:
mdcode run {{args}} README.md -- bash test.sh
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ brew tap TBD54566975/ftl && brew install ftl
set -Eeuxo pipefail
just build ftl
export PATH="$(git rev-parse --show-toplevel)/build/release:$PATH"
export FTL_ROOT="$(git rev-parse --show-toplevel)"
export PATH="$FTL_ROOT/build/release:$PATH"
export FTL_INIT_GO_REPLACE="github.com/TBD54566975/ftl=$FTL_ROOT"
pwd
Expand Down
2 changes: 1 addition & 1 deletion backend/controller/console.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ func eventsQueryProtoToDAL(pb *pbconsole.EventsQuery) ([]dal.EventFilter, error)
case *pbconsole.EventsQuery_Filter_Requests:
requestNames := make([]model.RequestName, 0, len(filter.Requests.Requests))
for _, request := range filter.Requests.Requests {
_, requestName, err := model.ParseRequestName(request)
requestName, err := model.ParseRequestName(request)
if err != nil {
return nil, connect.NewError(connect.CodeInvalidArgument, err)
}
Expand Down
67 changes: 33 additions & 34 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,8 @@ type Service struct {
}

func New(ctx context.Context, db *dal.DAL, config Config, runnerScaling scaling.RunnerScaling) (*Service, error) {
var zero model.ControllerKey
key := config.Key
if config.Key == zero {
if config.Key.IsZero() {
key = model.NewControllerKey(config.Bind.Hostname(), config.Bind.Port())
}
config.SetDefaults()
Expand Down Expand Up @@ -249,7 +248,7 @@ func (s *Service) ProcessList(ctx context.Context, req *connect.Request[ftlv1.Pr
func (s *Service) Status(ctx context.Context, req *connect.Request[ftlv1.StatusRequest]) (*connect.Response[ftlv1.StatusResponse], error) {
status, err := s.dal.GetStatus(ctx, req.Msg.AllControllers, req.Msg.AllRunners, req.Msg.AllIngressRoutes)
if err != nil {
return nil, fmt.Errorf("%s: %w", "could not get status", err)
return nil, fmt.Errorf("could not get status: %w", err)
}
s.routesMu.RLock()
routes := slices.FlatMap(maps.Values(s.routes), func(routes []dal.Route) (out []*ftlv1.StatusResponse_Route) {
Expand Down Expand Up @@ -334,13 +333,13 @@ func (s *Service) StreamDeploymentLogs(ctx context.Context, stream *connect.Clie
msg := stream.Msg()
deploymentKey, err := model.ParseDeploymentKey(msg.DeploymentKey)
if err != nil {
return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("%s: %w", "invalid deployment key", err))
return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("invalid deployment key: %w", err))
}
var requestName optional.Option[model.RequestName]
if msg.RequestName != nil {
_, rkey, err := model.ParseRequestName(*msg.RequestName)
rkey, err := model.ParseRequestName(*msg.RequestName)
if err != nil {
return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("%s: %w", "invalid request key", err))
return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("invalid request key: %w", err))
}
requestName = optional.Some(rkey)
}
Expand Down Expand Up @@ -387,7 +386,7 @@ func (s *Service) PullSchema(ctx context.Context, req *connect.Request[ftlv1.Pul
func (s *Service) UpdateDeploy(ctx context.Context, req *connect.Request[ftlv1.UpdateDeployRequest]) (response *connect.Response[ftlv1.UpdateDeployResponse], err error) {
deploymentKey, err := model.ParseDeploymentKey(req.Msg.DeploymentKey)
if err != nil {
return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("%s: %w", "invalid deployment key", err))
return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("invalid deployment key: %w", err))
}

logger := s.getDeploymentLogger(ctx, deploymentKey)
Expand All @@ -400,7 +399,7 @@ func (s *Service) UpdateDeploy(ctx context.Context, req *connect.Request[ftlv1.U
return nil, connect.NewError(connect.CodeNotFound, errors.New("deployment not found"))
}
logger.Errorf(err, "Could not set deployment replicas: %s", deploymentKey)
return nil, fmt.Errorf("%s: %w", "could not set deployment replicas", err)
return nil, fmt.Errorf("could not set deployment replicas: %w", err)
}

return connect.NewResponse(&ftlv1.UpdateDeployResponse{}), nil
Expand All @@ -424,7 +423,7 @@ func (s *Service) ReplaceDeploy(ctx context.Context, c *connect.Request[ftlv1.Re
logger.Debugf("Deployment already exists: %s", newDeploymentKey)
} else {
logger.Errorf(err, "Could not replace deployment: %s", newDeploymentKey)
return nil, fmt.Errorf("%s: %w", "could not replace deployment", err)
return nil, fmt.Errorf("could not replace deployment: %w", err)
}
}
return connect.NewResponse(&ftlv1.ReplaceDeployResponse{}), nil
Expand All @@ -438,14 +437,14 @@ func (s *Service) RegisterRunner(ctx context.Context, stream *connect.ClientStre
msg := stream.Msg()
endpoint, err := url.Parse(msg.Endpoint)
if err != nil {
return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("%s: %w", "invalid endpoint", err))
return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("invalid endpoint: %w", err))
}
if endpoint.Scheme != "http" && endpoint.Scheme != "https" {
return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("invalid endpoint scheme %q", endpoint.Scheme))
}
runnerKey, err := model.ParseRunnerKey(msg.Key)
if err != nil {
return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("%s: %w", "invalid key", err))
return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("invalid key: %w", err))
}

runnerStr := fmt.Sprintf("%s (%s)", endpoint, runnerKey)
Expand All @@ -461,7 +460,7 @@ func (s *Service) RegisterRunner(ctx context.Context, stream *connect.ClientStre
}()
err = s.pingRunner(ctx, endpoint)
if err != nil {
return nil, fmt.Errorf("%s: %w", "runner callback failed", err)
return nil, fmt.Errorf("runner callback failed: %w", err)
}
initialised = true
}
Expand Down Expand Up @@ -507,7 +506,7 @@ func (s *Service) pingRunner(ctx context.Context, endpoint *url.URL) error {
defer cancel()
err := rpc.Wait(heartbeatCtx, retry, client)
if err != nil {
return connect.NewError(connect.CodeUnavailable, fmt.Errorf("%s: %w", "failed to connect to runner", err))
return connect.NewError(connect.CodeUnavailable, fmt.Errorf("failed to connect to runner: %w", err))
}
return nil
}
Expand Down Expand Up @@ -552,13 +551,13 @@ nextArtefact:
Artefact: ftlv1.ArtefactToProto(artefact),
Chunk: chunk[:n],
}); err != nil {
return fmt.Errorf("%s: %w", "could not send artefact chunk", err)
return fmt.Errorf("could not send artefact chunk: %w", err)
}
}
if errors.Is(err, io.EOF) {
break
} else if err != nil {
return fmt.Errorf("%s: %w", "could not read artefact chunk", err)
return fmt.Errorf("could not read artefact chunk: %w", err)
}
}
}
Expand Down Expand Up @@ -695,7 +694,7 @@ func (s *Service) CreateDeployment(ctx context.Context, req *connect.Request[ftl
digest, err := sha256.ParseSHA256(artefact.Digest)
if err != nil {
logger.Errorf(err, "Invalid digest %s", artefact.Digest)
return nil, fmt.Errorf("%s: %w", "invalid digest", err)
return nil, fmt.Errorf("invalid digest: %w", err)
}
artefacts[i] = dal.DeploymentArtefact{
Executable: artefact.Executable,
Expand All @@ -713,19 +712,19 @@ func (s *Service) CreateDeployment(ctx context.Context, req *connect.Request[ftl
module, err := schema.ModuleFromProto(ms)
if err != nil {
logger.Errorf(err, "Invalid module schema")
return nil, fmt.Errorf("%s: %w", "invalid module schema", err)
return nil, fmt.Errorf("invalid module schema: %w", err)
}
module, err = s.validateModuleSchema(ctx, module)
if err != nil {
logger.Errorf(err, "Invalid module schema")
return nil, fmt.Errorf("%s: %w", "invalid module schema", err)
return nil, fmt.Errorf("invalid module schema: %w", err)
}

ingressRoutes := extractIngressRoutingEntries(req.Msg)
dname, err := s.dal.CreateDeployment(ctx, ms.Runtime.Language, module, artefacts, ingressRoutes)
if err != nil {
logger.Errorf(err, "Could not create deployment")
return nil, fmt.Errorf("%s: %w", "could not create deployment", err)
return nil, fmt.Errorf("could not create deployment: %w", err)
}
deploymentLogger := s.getDeploymentLogger(ctx, dname)
deploymentLogger.Debugf("Created deployment %s", dname)
Expand All @@ -737,30 +736,30 @@ func (s *Service) CreateDeployment(ctx context.Context, req *connect.Request[ftl
func (s *Service) validateModuleSchema(ctx context.Context, module *schema.Module) (*schema.Module, error) {
existingModules, err := s.dal.GetActiveDeploymentSchemas(ctx)
if err != nil {
return nil, fmt.Errorf("%s: %w", "could not get existing schemas", err)
return nil, fmt.Errorf("could not get existing schemas: %w", err)
}
schemaMap := ftlmaps.FromSlice(existingModules, func(el *schema.Module) (string, *schema.Module) { return el.Name, el })
schemaMap[module.Name] = module
fullSchema := &schema.Schema{Modules: maps.Values(schemaMap)}
schema, err := schema.ValidateModuleInSchema(fullSchema, optional.Some[*schema.Module](module))
if err != nil {
return nil, fmt.Errorf("%s: %w", "invalid schema", err)
return nil, fmt.Errorf("invalid schema: %w", err)
}
return schema.Module(module.Name).MustGet(), nil
}

func (s *Service) getDeployment(ctx context.Context, key string) (*model.Deployment, error) {
dkey, err := model.ParseDeploymentKey(key)
if err != nil {
return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("%s: %w", "invalid deployment key", err))
return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("invalid deployment key: %w", err))
}
deployment, err := s.dal.GetDeployment(ctx, dkey)
if errors.Is(err, pgx.ErrNoRows) {
logger := s.getDeploymentLogger(ctx, dkey)
logger.Errorf(err, "Deployment not found")
return nil, connect.NewError(connect.CodeNotFound, errors.New("deployment not found"))
} else if err != nil {
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("%s: %w", "could not retrieve deployment", err))
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("could not retrieve deployment: %w", err))
}
return deployment, nil
}
Expand All @@ -783,7 +782,7 @@ func (s *Service) reapStaleRunners(ctx context.Context) (time.Duration, error) {
logger := log.FromContext(ctx)
count, err := s.dal.KillStaleRunners(context.Background(), s.config.RunnerTimeout)
if err != nil {
return 0, fmt.Errorf("%s: %w", "Failed to delete stale runners", err)
return 0, fmt.Errorf("failed to delete stale runners: %w", err)
} else if count > 0 {
logger.Debugf("Reaped %d stale runners", count)
}
Expand All @@ -795,7 +794,7 @@ func (s *Service) releaseExpiredReservations(ctx context.Context) (time.Duration
logger := log.FromContext(ctx)
count, err := s.dal.ExpireRunnerClaims(ctx)
if err != nil {
return 0, fmt.Errorf("%s: %w", "Failed to expire runner reservations", err)
return 0, fmt.Errorf("failed to expire runner reservations: %w", err)
} else if count > 0 {
logger.Warnf("Expired %d runner reservations", count)
}
Expand All @@ -807,7 +806,7 @@ func (s *Service) releaseExpiredReservations(ctx context.Context) (time.Duration
func (s *Service) reconcileDeployments(ctx context.Context) (time.Duration, error) {
reconciliation, err := s.dal.GetDeploymentsNeedingReconciliation(ctx)
if err != nil {
return 0, fmt.Errorf("%s: %w", "failed to get deployments needing reconciliation", err)
return 0, fmt.Errorf("failed to get deployments needing reconciliation: %w", err)
}
oldFailures := make(map[string]int)
for k, v := range s.increaseReplicaFailures {
Expand Down Expand Up @@ -890,7 +889,7 @@ func (s *Service) reconcileDeployments(ctx context.Context) (time.Duration, erro
func (s *Service) reconcileRunners(ctx context.Context) (time.Duration, error) {
activeDeployments, err := s.dal.GetActiveDeployments(ctx)
if err != nil {
return 0, fmt.Errorf("%s: %w", "failed to get deployments needing reconciliation", err)
return 0, fmt.Errorf("failed to get deployments needing reconciliation: %w", err)
}

totalRunners := s.config.IdleRunners
Expand Down Expand Up @@ -975,7 +974,7 @@ func (s *Service) reapStaleControllers(ctx context.Context) (time.Duration, erro
logger := log.FromContext(ctx)
count, err := s.dal.KillStaleControllers(context.Background(), s.config.RunnerTimeout)
if err != nil {
return 0, fmt.Errorf("%s: %w", "failed to delete stale controllers", err)
return 0, fmt.Errorf("failed to delete stale controllers: %w", err)
} else if count > 0 {
logger.Debugf("Reaped %d stale controllers", count)
}
Expand All @@ -986,7 +985,7 @@ func (s *Service) reapStaleControllers(ctx context.Context) (time.Duration, erro
func (s *Service) heartbeatController(ctx context.Context) (time.Duration, error) {
_, err := s.dal.UpsertController(ctx, s.key, s.config.Advertise.String())
if err != nil {
return 0, fmt.Errorf("%s: %w", "failed to heartbeat controller", err)
return 0, fmt.Errorf("failed to heartbeat controller: %w", err)
}
return time.Second * 3, nil

Expand All @@ -999,7 +998,7 @@ func (s *Service) watchModuleChanges(ctx context.Context, sendChange func(respon
minReplicas int
}
moduleState := map[string]moduleStateEntry{}
moduleByDeploymentKey := map[model.DeploymentKey]string{}
moduleByDeploymentKey := map[string]string{}

// Seed the notification channel with the current deployments.
seedDeployments, err := s.dal.GetActiveDeployments(ctx)
Expand Down Expand Up @@ -1039,14 +1038,14 @@ func (s *Service) watchModuleChanges(ctx context.Context, sendChange func(respon
var response *ftlv1.PullSchemaResponse
// Deleted key
if deletion, ok := notification.Deleted.Get(); ok {
name := moduleByDeploymentKey[deletion]
name := moduleByDeploymentKey[deletion.String()]
response = &ftlv1.PullSchemaResponse{
ModuleName: name,
DeploymentKey: deletion.String(),
ChangeType: ftlv1.DeploymentChangeType_DEPLOYMENT_REMOVED,
}
delete(moduleState, name)
delete(moduleByDeploymentKey, deletion)
delete(moduleByDeploymentKey, deletion.String())
} else if message, ok := notification.Message.Get(); ok {
moduleSchema := message.Schema.ToProto().(*schemapb.Module) //nolint:forcetypeassert
moduleSchema.Runtime = &schemapb.ModuleRuntime{
Expand Down Expand Up @@ -1089,8 +1088,8 @@ func (s *Service) watchModuleChanges(ctx context.Context, sendChange func(respon
}
}
moduleState[message.Schema.Name] = newState
delete(moduleByDeploymentKey, message.Key) // The deployment may have changed.
moduleByDeploymentKey[message.Key] = message.Schema.Name
delete(moduleByDeploymentKey, message.Key.String()) // The deployment may have changed.
moduleByDeploymentKey[message.Key.String()] = message.Schema.Name
}

if response != nil {
Expand Down
9 changes: 4 additions & 5 deletions backend/controller/dal/dal.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,10 +423,9 @@ func (d *DAL) CreateDeployment(ctx context.Context, language string, moduleSchem
defer tx.CommitOrRollback(ctx, &err)

existingDeployment, err := d.checkForExistingDeployments(ctx, tx, moduleSchema, artefacts)
var zero model.DeploymentKey
if err != nil {
return model.DeploymentKey{}, err
} else if existingDeployment != zero {
} else if !existingDeployment.IsZero() {
logger.Tracef("Returning existing deployment %s", existingDeployment)
return existingDeployment, nil
}
Expand Down Expand Up @@ -907,7 +906,7 @@ func (d *DAL) InsertLogEvent(ctx context.Context, log *LogEvent) error {
}
var requestName optional.Option[string]
if name, ok := log.RequestName.Get(); ok {
requestName = optional.Some(string(name))
requestName = optional.Some(name.String())
}
return translatePGError(d.db.InsertLogEvent(ctx, sql.InsertLogEventParams{
DeploymentKey: log.DeploymentKey,
Expand Down Expand Up @@ -945,7 +944,7 @@ func (d *DAL) loadDeployment(ctx context.Context, deployment sql.GetDeploymentRo

func (d *DAL) CreateIngressRequest(ctx context.Context, route, addr string) (model.RequestName, error) {
name := model.NewRequestName(model.OriginIngress, route)
err := d.db.CreateIngressRequest(ctx, sql.OriginIngress, string(name), addr)
err := d.db.CreateIngressRequest(ctx, sql.OriginIngress, name, addr)
return name, err
}

Expand Down Expand Up @@ -981,7 +980,7 @@ func (d *DAL) InsertCallEvent(ctx context.Context, call *CallEvent) error {
}
var requestName optional.Option[string]
if rn, ok := call.RequestName.Get(); ok {
requestName = optional.Some(string(rn))
requestName = optional.Some(rn.String())
}
return translatePGError(d.db.InsertCallEvent(ctx, sql.InsertCallEventParams{
DeploymentKey: call.DeploymentKey,
Expand Down
4 changes: 2 additions & 2 deletions backend/controller/dal/dal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func TestDAL(t *testing.T) {
})

t.Run("GetMissingDeployment", func(t *testing.T) {
_, err := dal.GetDeployment(ctx, model.NewDeploymentKey("test"))
_, err := dal.GetDeployment(ctx, model.NewDeploymentKey("invalid"))
assert.IsError(t, err, ErrNotFound)
})

Expand Down Expand Up @@ -162,7 +162,7 @@ func TestDAL(t *testing.T) {
})

t.Run("ReserveRunnerForInvalidDeployment", func(t *testing.T) {
_, err := dal.ReserveRunnerForDeployment(ctx, model.NewDeploymentKey("test"), time.Second, labels)
_, err := dal.ReserveRunnerForDeployment(ctx, model.NewDeploymentKey("invalid"), time.Second, labels)
assert.Error(t, err)
assert.IsError(t, err, ErrNotFound)
assert.EqualError(t, err, "deployment: not found")
Expand Down
2 changes: 1 addition & 1 deletion backend/controller/deployment_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (d *deploymentLogsSink) processLogs(ctx context.Context) {

var request optional.Option[model.RequestName]
if reqStr, ok := entry.Attributes["request"]; ok {
_, req, err := model.ParseRequestName(reqStr)
req, err := model.ParseRequestName(reqStr)
if err == nil {
request = optional.Some(req)
}
Expand Down
Loading

0 comments on commit 3b62f1b

Please sign in to comment.