Skip to content

Commit

Permalink
chore: configurable provisioners in ftl-provisioner (#2925)
Browse files Browse the repository at this point in the history
Adds a TOML config file for configuring which provisioner plugin to use
for which resource.

There is a hard coded `noop` provisioner doing nothing. Otherwise a
binary with name `ftl-provisioner-<name>` is used as plugin.

Next, I will use Localstack to test that the Cloudformation plugin
actually creates a DB.

---------

Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
jvmakine and github-actions[bot] authored Oct 1, 2024
1 parent 091a2c4 commit d55777e
Show file tree
Hide file tree
Showing 9 changed files with 211 additions and 86 deletions.
9 changes: 7 additions & 2 deletions backend/provisioner/deployment/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,13 @@ func (d *Deployment) Progress(ctx context.Context) (bool, error) {
return true, err
}
}
err := next.Progress(ctx)
return d.next().Ok(), err
if next.state != TaskStateDone {
err := next.Progress(ctx)
if err != nil {
return true, err
}
}
return d.next().Ok(), nil
}

type DeploymentState struct {
Expand Down
19 changes: 19 additions & 0 deletions backend/provisioner/deployment/noop_provisioner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package deployment

import (
"context"

"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1beta1/provisioner"
)

type NoopProvisioner struct{}

func (n *NoopProvisioner) Provision(ctx context.Context, module string, desired []*provisioner.ResourceContext, existing []*provisioner.Resource) (string, error) {
return "", nil
}

func (n *NoopProvisioner) State(ctx context.Context, token string, desired []*provisioner.Resource) (TaskState, []*provisioner.Resource, error) {
return TaskStateDone, desired, nil
}

var _ Provisioner = (*NoopProvisioner)(nil)
71 changes: 71 additions & 0 deletions backend/provisioner/deployment/plugin_provisioner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package deployment

import (
"context"
"fmt"

"connectrpc.com/connect"

"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1beta1/provisioner"
"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1beta1/provisioner/provisionerconnect"
"github.com/TBD54566975/ftl/common/plugin"
"github.com/TBD54566975/ftl/internal/log"
)

// PluginProvisioner delegates provisioning to an external plugin
type PluginProvisioner struct {
cmdCtx context.Context
client *plugin.Plugin[provisionerconnect.ProvisionerPluginServiceClient]
}

var _ Provisioner = (*PluginProvisioner)(nil)

func NewPluginProvisioner(ctx context.Context, name string) (*PluginProvisioner, error) {
client, cmdCtx, err := plugin.Spawn(
ctx,
log.Debug,
"ftl-provisioner-"+name,
".",
"ftl-provisioner-"+name,
provisionerconnect.NewProvisionerPluginServiceClient,
)
if err != nil {
return nil, fmt.Errorf("error spawning plugin: %w", err)
}

return &PluginProvisioner{
cmdCtx: cmdCtx,
client: client,
}, nil
}

func (p *PluginProvisioner) Provision(ctx context.Context, module string, desired []*provisioner.ResourceContext, existing []*provisioner.Resource) (string, error) {
resp, err := p.client.Client.Provision(ctx, connect.NewRequest(&provisioner.ProvisionRequest{
DesiredResources: desired,
ExistingResources: existing,
FtlClusterId: "ftl",
Module: module,
}))
if err != nil {
return "", fmt.Errorf("error calling plugin: %w", err)
}
if resp.Msg.Status != provisioner.ProvisionResponse_SUBMITTED {
return resp.Msg.ProvisioningToken, nil
}
return "", nil
}

func (p *PluginProvisioner) State(ctx context.Context, token string, desired []*provisioner.Resource) (TaskState, []*provisioner.Resource, error) {
resp, err := p.client.Client.Status(ctx, connect.NewRequest(&provisioner.StatusRequest{
ProvisioningToken: token,
}))
if err != nil {
return "", nil, fmt.Errorf("error getting status from plugin: %w", err)
}
if failed, ok := resp.Msg.Status.(*provisioner.StatusResponse_Failed); ok {
return TaskStateFailed, nil, fmt.Errorf("provisioning failed: %s", failed.Failed.ErrorMessage)
} else if success, ok := resp.Msg.Status.(*provisioner.StatusResponse_Success); ok {
return TaskStateDone, success.Success.UpdatedResources, nil
}
return TaskStateRunning, nil, nil
}
107 changes: 44 additions & 63 deletions backend/provisioner/deployment/provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,8 @@ import (
"context"
"fmt"

"connectrpc.com/connect"

"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1beta1/provisioner"
"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1beta1/provisioner/provisionerconnect"
"github.com/TBD54566975/ftl/backend/schema"
"github.com/TBD54566975/ftl/common/plugin"
"github.com/TBD54566975/ftl/internal/log"
)

// ResourceType is a type of resource used to configure provisioners
Expand All @@ -28,16 +23,60 @@ type Provisioner interface {
State(ctx context.Context, token string, desired []*provisioner.Resource) (TaskState, []*provisioner.Resource, error)
}

// ProvisionerPluginConfig is a map of provisioner name to resources it supports
type ProvisionerPluginConfig struct {
// The default provisioner to use for all resources not matched here
Default string `toml:"default"`
Plugins []struct {
Name string `toml:"name"`
Resources []ResourceType `toml:"resources"`
} `toml:"plugins"`
}

func (cfg *ProvisionerPluginConfig) Validate() error {
registeredResources := map[ResourceType]bool{}
for _, plugin := range cfg.Plugins {
for _, r := range plugin.Resources {
if registeredResources[r] {
return fmt.Errorf("resource type %s is already registered. Trying to re-register for %s", r, plugin.Name)
}
registeredResources[r] = true
}
}
return nil
}

type provisionerConfig struct {
provisioner Provisioner
types []ResourceType
}

// ProvisionerRegistry contains all known resource handlers in the order they should be executed
type ProvisionerRegistry struct {
Default Provisioner
Provisioners []*provisionerConfig
}

func NewProvisionerRegistry(ctx context.Context, cfg *ProvisionerPluginConfig) (*ProvisionerRegistry, error) {
result := &ProvisionerRegistry{}
if err := cfg.Validate(); err != nil {
return nil, fmt.Errorf("error validating provisioner config: %w", err)
}
for _, plugin := range cfg.Plugins {
switch plugin.Name {
case "noop":
result.Register(&NoopProvisioner{}, plugin.Resources...)
default:
provisioner, err := NewPluginProvisioner(ctx, plugin.Name)
if err != nil {
return nil, fmt.Errorf("error creating provisioner plugin %s: %w", plugin.Name, err)
}
result.Register(provisioner, plugin.Resources...)
}
}
return result, nil
}

// Register to the registry, to be executed after all the previously added handlers
func (reg *ProvisionerRegistry) Register(handler Provisioner, types ...ResourceType) {
reg.Provisioners = append(reg.Provisioners, &provisionerConfig{
Expand Down Expand Up @@ -112,61 +151,3 @@ func typeOf(r *provisioner.Resource) ResourceType {
}
return ResourceTypeUnknown
}

// PluginProvisioner delegates provisioning to an external plugin
type PluginProvisioner struct {
cmdCtx context.Context
client *plugin.Plugin[provisionerconnect.ProvisionerPluginServiceClient]
}

func NewPluginProvisioner(ctx context.Context, name, dir, exe string) (*PluginProvisioner, error) {
client, cmdCtx, err := plugin.Spawn(
ctx,
log.Debug,
name,
dir,
exe,
provisionerconnect.NewProvisionerPluginServiceClient,
)
if err != nil {
return nil, fmt.Errorf("error spawning plugin: %w", err)
}

return &PluginProvisioner{
cmdCtx: cmdCtx,
client: client,
}, nil
}

func (p *PluginProvisioner) Provision(ctx context.Context, module string, desired []*provisioner.ResourceContext, existing []*provisioner.Resource) (string, error) {
resp, err := p.client.Client.Provision(ctx, connect.NewRequest(&provisioner.ProvisionRequest{
DesiredResources: desired,
ExistingResources: existing,
FtlClusterId: "ftl",
Module: module,
}))
if err != nil {
return "", fmt.Errorf("error calling plugin: %w", err)
}
if resp.Msg.Status != provisioner.ProvisionResponse_SUBMITTED {
return resp.Msg.ProvisioningToken, nil
}
return "", nil
}

func (p *PluginProvisioner) State(ctx context.Context, token string, desired []*provisioner.Resource) (TaskState, []*provisioner.Resource, error) {
resp, err := p.client.Client.Status(ctx, connect.NewRequest(&provisioner.StatusRequest{
ProvisioningToken: token,
}))
if err != nil {
return "", nil, fmt.Errorf("error getting status from plugin: %w", err)
}
if failed, ok := resp.Msg.Status.(*provisioner.StatusResponse_Failed); ok {
return TaskStateFailed, nil, fmt.Errorf("provisioning failed: %s", failed.Failed.ErrorMessage)
} else if success, ok := resp.Msg.Status.(*provisioner.StatusResponse_Success); ok {
return TaskStateDone, success.Success.UpdatedResources, nil
}
return TaskStateRunning, nil, nil
}

var _ Provisioner = (*PluginProvisioner)(nil)
9 changes: 7 additions & 2 deletions backend/provisioner/provisioner_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,14 @@ import (
"github.com/alecthomas/assert/v2"
)

func TestDeploymentThroughProvisioner(t *testing.T) {
func TestDeploymentThroughNoopProvisioner(t *testing.T) {
in.Run(t,
in.WithProvisioner(),
in.WithProvisioner(`
default = "noop"
plugins = [
{ name = "noop", resources = ["postgres"] },
]
`),
in.CopyModule("echo"),
in.Deploy("echo"),
in.Call("echo", "echo", "Bob", func(t testing.TB, response string) {
Expand Down
48 changes: 40 additions & 8 deletions backend/provisioner/service.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
package provisioner

import (
"bufio"
"context"
"fmt"
"io"
"net/url"
"os"

"connectrpc.com/connect"
"github.com/BurntSushi/toml"
"github.com/alecthomas/kong"
"golang.org/x/sync/errgroup"

Expand All @@ -19,34 +23,42 @@ import (
"github.com/TBD54566975/ftl/internal/rpc"
)

// CommonProvisionerConfig is shared config between the production controller and development server.
type CommonProvisionerConfig struct {
PluginConfigFile *os.File `name:"provisioner-plugin-config" help:"Path to the plugin configuration file." env:"FTL_PROVISIONER_PLUGIN_CONFIG_FILE"`
}

type Config struct {
Bind *url.URL `help:"Socket to bind to." default:"http://127.0.0.1:8893" env:"FTL_PROVISIONER_BIND"`
Advertise *url.URL `help:"Endpoint the Provisioner should advertise (must be unique across the cluster, defaults to --bind if omitted)." env:"FTL_PROVISIONER_ADVERTISE"`
ControllerEndpoint *url.URL `name:"ftl-endpoint" help:"Controller endpoint." env:"FTL_ENDPOINT" default:"http://127.0.0.1:8892"`
CommonProvisionerConfig
}

func (c *Config) SetDefaults() {
if err := kong.ApplyDefaults(c); err != nil {
panic(err)
}
if c.Advertise == nil {
c.Advertise = c.Bind
}
}

type Service struct {
controllerClient ftlv1connect.ControllerServiceClient
// TODO: Store in a resource graph
currentResources map[string][]*provisioner.Resource
registry deployment.ProvisionerRegistry
registry *deployment.ProvisionerRegistry
}

var _ provisionerconnect.ProvisionerServiceHandler = (*Service)(nil)

func New(ctx context.Context, config Config, controllerClient ftlv1connect.ControllerServiceClient, devel bool) (*Service, error) {
func New(ctx context.Context, config Config, controllerClient ftlv1connect.ControllerServiceClient, pluginConfig *deployment.ProvisionerPluginConfig) (*Service, error) {
registry, err := deployment.NewProvisionerRegistry(ctx, pluginConfig)
if err != nil {
return nil, fmt.Errorf("error creating provisioner registry: %w", err)
}

return &Service{
controllerClient: controllerClient,
currentResources: map[string][]*provisioner.Resource{},
registry: registry,
}, nil
}

Expand Down Expand Up @@ -126,12 +138,20 @@ func Start(ctx context.Context, config Config, devel bool) error {

controllerClient := rpc.Dial(ftlv1connect.NewControllerServiceClient, config.ControllerEndpoint.String(), log.Error)

svc, err := New(ctx, config, controllerClient, devel)
pluginConfig := &deployment.ProvisionerPluginConfig{Default: "noop"}
if config.PluginConfigFile != nil {
pc, err := readPluginConfig(config.PluginConfigFile)
if err != nil {
return fmt.Errorf("error reading plugin configuration: %w", err)
}
pluginConfig = pc
}

svc, err := New(ctx, config, controllerClient, pluginConfig)
if err != nil {
return err
}
logger.Debugf("Provisioner available at: %s", config.Bind)
logger.Debugf("Advertising as %s", config.Advertise)
logger.Debugf("Using FTL endpoint: %s", config.ControllerEndpoint)

g, ctx := errgroup.WithContext(ctx)
Expand All @@ -147,6 +167,18 @@ func Start(ctx context.Context, config Config, devel bool) error {
return nil
}

func readPluginConfig(file *os.File) (*deployment.ProvisionerPluginConfig, error) {
result := deployment.ProvisionerPluginConfig{}
bytes, err := io.ReadAll(bufio.NewReader(file))
if err != nil {
return nil, fmt.Errorf("error reading plugin configuration: %w", err)
}
if err := toml.Unmarshal(bytes, &result); err != nil {
return nil, fmt.Errorf("error parsing plugin configuration: %w", err)
}
return &result, nil
}

// Deployment client calls to ftl-controller

func (s *Service) GetArtefactDiffs(ctx context.Context, req *connect.Request[ftlv1.GetArtefactDiffsRequest]) (*connect.Response[ftlv1.GetArtefactDiffsResponse], error) {
Expand Down
4 changes: 4 additions & 0 deletions backend/provisioner/testdata/go/echo/echo.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,12 @@ package echo
import (
"context"
"fmt"

"github.com/TBD54566975/ftl/go-runtime/ftl"
)

var db = ftl.PostgresDatabase("echodb")

// Echo returns a greeting with the current time.
//
//ftl:verb export
Expand Down
Loading

0 comments on commit d55777e

Please sign in to comment.