Skip to content

Commit

Permalink
Use pubsub for schema changes
Browse files Browse the repository at this point in the history
  • Loading branch information
wesbillman committed Mar 11, 2024
1 parent a16d9c5 commit cb51456
Showing 1 changed file with 42 additions and 33 deletions.
75 changes: 42 additions & 33 deletions buildengine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"connectrpc.com/connect"
"github.com/alecthomas/types/pubsub"
"github.com/jpillora/backoff"
"github.com/puzpuzpuz/xsync/v3"
"golang.org/x/exp/maps"
Expand All @@ -22,12 +23,18 @@ import (
"github.com/TBD54566975/ftl/internal/rpc"
)

type schemaChange struct {
ChangeType ftlv1.DeploymentChangeType
*schema.Module
}

// Engine for building a set of modules.
type Engine struct {
client ftlv1connect.ControllerServiceClient
modules map[string]Module
dirs []string
controllerSchema *xsync.MapOf[string, *schema.Module]
schemaChanges *pubsub.Topic[schemaChange]
cancel func()
}

Expand All @@ -45,6 +52,7 @@ func New(ctx context.Context, client ftlv1connect.ControllerServiceClient, dirs
dirs: dirs,
modules: map[string]Module{},
controllerSchema: xsync.NewMapOf[string, *schema.Module](),
schemaChanges: pubsub.New[schemaChange](),
}
e.controllerSchema.Store("builtin", schema.Builtins())
ctx, cancel := context.WithCancel(ctx)
Expand Down Expand Up @@ -96,9 +104,11 @@ func (e *Engine) startSchemaSync(ctx context.Context) func(ctx context.Context,
return err
}
e.controllerSchema.Store(sch.Name, sch)
e.schemaChanges.Publish(schemaChange{ChangeType: msg.ChangeType, Module: sch})

case ftlv1.DeploymentChangeType_DEPLOYMENT_REMOVED:
e.controllerSchema.Delete(msg.ModuleName)
e.schemaChanges.Publish(schemaChange{ChangeType: msg.ChangeType, Module: nil})
}
return nil
}
Expand Down Expand Up @@ -191,47 +201,22 @@ func (e *Engine) watchForModuleChanges(ctx context.Context, period time.Duration
return true
})

// Watch for schema changes from the FTL controller.
go rpc.RetryStreamingServerStream(ctx, backoff.Backoff{Max: time.Second}, &ftlv1.PullSchemaRequest{}, e.client.PullSchema, func(ctx context.Context, msg *ftlv1.PullSchemaResponse) error {
if msg.ChangeType != ftlv1.DeploymentChangeType_DEPLOYMENT_CHANGED {
return nil // Early return if the condition for inversion is met
}

module, err := schema.ModuleFromProto(msg.Schema)
if err != nil {
return err
}
hash, err := computeModuleHash(module)
if err != nil {
logger.Errorf(err, "compute hash for %s failed", module.Name)
return nil
}
if bytes.Equal(hash, moduleHashes[msg.ModuleName]) {
return nil
}

moduleHashes[msg.ModuleName] = hash
modulesToDeploy := e.getDependentModules(msg.ModuleName)
logger.Infof("%s's schema changed; redeploying: %+v", msg.ModuleName, modulesToDeploy)
err = e.buildAndDeploy(ctx, 1, true, modulesToDeploy...)
if err != nil {
logger.Errorf(err, "deploy %s failed", msg.ModuleName)
}

return nil
})
schemaChanges := make(chan schemaChange, 128)
e.schemaChanges.Subscribe(schemaChanges)
defer e.schemaChanges.Unsubscribe(schemaChanges)

events := make(chan WatchEvent, 128)
watchEvents := make(chan WatchEvent, 128)
watch := Watch(ctx, period, e.dirs...)
watch.Subscribe(watchEvents)
defer watch.Unsubscribe(watchEvents)
defer watch.Close()
watch.Subscribe(events)

// Watch for file system changes.
// Watch for file and schema changes
for {
select {
case <-ctx.Done():
return ctx.Err()
case event := <-events:
case event := <-watchEvents:
switch event := event.(type) {
case WatchEventModuleAdded:
if _, exists := e.modules[event.Module.Module]; !exists {
Expand All @@ -254,7 +239,31 @@ func (e *Engine) watchForModuleChanges(ctx context.Context, period time.Duration
logger.Errorf(err, "deploy %s failed", event.Module.Module)
}
}
case change := <-schemaChanges:
if change.ChangeType != ftlv1.DeploymentChangeType_DEPLOYMENT_CHANGED {
continue
}

hash, err := computeModuleHash(change.Module)
if err != nil {
logger.Errorf(err, "compute hash for %s failed", change.Name)
continue
}

if bytes.Equal(hash, moduleHashes[change.Name]) {
logger.Tracef("schema for %s has not changed", change.Name)
continue
}

moduleHashes[change.Name] = hash
modulesToDeploy := e.getDependentModules(change.Name)
logger.Infof("%s's schema changed; redeploying: %+v", change.Name, modulesToDeploy)
err = e.buildAndDeploy(ctx, 1, true, modulesToDeploy...)
if err != nil {
logger.Errorf(err, "deploy %s failed", change.Name)
}
}

}
}

Expand Down

0 comments on commit cb51456

Please sign in to comment.