From 8d0c162529387231e08259989a7d617b9467ea86 Mon Sep 17 00:00:00 2001 From: Juho Makinen Date: Wed, 13 Nov 2024 13:53:20 +1100 Subject: [PATCH] chore: move postgres provisioning to a separate file --- .../postgres.go | 53 ++++++++++ .../provisioner.go | 80 +++++--------- cmd/ftl-provisioner-cloudformation/status.go | 84 ++++----------- cmd/ftl-provisioner-cloudformation/task.go | 100 ++++++++++++++++++ go.sum | 2 + 5 files changed, 204 insertions(+), 115 deletions(-) create mode 100644 cmd/ftl-provisioner-cloudformation/postgres.go create mode 100644 cmd/ftl-provisioner-cloudformation/task.go diff --git a/cmd/ftl-provisioner-cloudformation/postgres.go b/cmd/ftl-provisioner-cloudformation/postgres.go new file mode 100644 index 000000000..408742260 --- /dev/null +++ b/cmd/ftl-provisioner-cloudformation/postgres.go @@ -0,0 +1,53 @@ +package main + +import ( + goformation "github.com/awslabs/goformation/v7/cloudformation" + "github.com/awslabs/goformation/v7/cloudformation/rds" +) + +type PostgresTemplater struct { + resourceID string + cluster string + module string + config *Config +} + +var _ ResourceTemplater = (*PostgresTemplater)(nil) + +func (p *PostgresTemplater) AddToTemplate(template *goformation.Template) error { + clusterID := cloudformationResourceID(p.resourceID, "cluster") + instanceID := cloudformationResourceID(p.resourceID, "instance") + template.Resources[clusterID] = &rds.DBCluster{ + Engine: ptr("aurora-postgresql"), + MasterUsername: ptr("root"), + ManageMasterUserPassword: ptr(true), + DBSubnetGroupName: ptr(p.config.DatabaseSubnetGroupARN), + VpcSecurityGroupIds: []string{p.config.DatabaseSecurityGroup}, + EngineMode: ptr("provisioned"), + Port: ptr(5432), + ServerlessV2ScalingConfiguration: &rds.DBCluster_ServerlessV2ScalingConfiguration{ + MinCapacity: ptr(0.5), + MaxCapacity: ptr(10.0), + }, + Tags: ftlTags(p.cluster, p.module), + } + template.Resources[instanceID] = &rds.DBInstance{ + Engine: ptr("aurora-postgresql"), + DBInstanceClass: ptr("db.serverless"), + DBClusterIdentifier: ptr(goformation.Ref(clusterID)), + Tags: ftlTags(p.cluster, p.module), + } + addOutput(template.Outputs, goformation.GetAtt(clusterID, "Endpoint.Address"), &CloudformationOutputKey{ + ResourceID: p.resourceID, + PropertyName: PropertyDBWriteEndpoint, + }) + addOutput(template.Outputs, goformation.GetAtt(clusterID, "ReadEndpoint.Address"), &CloudformationOutputKey{ + ResourceID: p.resourceID, + PropertyName: PropertyDBReadEndpoint, + }) + addOutput(template.Outputs, goformation.GetAtt(clusterID, "MasterUserSecret.SecretArn"), &CloudformationOutputKey{ + ResourceID: p.resourceID, + PropertyName: PropertyMasterUserARN, + }) + return nil +} diff --git a/cmd/ftl-provisioner-cloudformation/provisioner.go b/cmd/ftl-provisioner-cloudformation/provisioner.go index c91e11e26..5f7c72aa1 100644 --- a/cmd/ftl-provisioner-cloudformation/provisioner.go +++ b/cmd/ftl-provisioner-cloudformation/provisioner.go @@ -3,7 +3,6 @@ package main import ( "bytes" "context" - "errors" "fmt" "strconv" "time" @@ -13,8 +12,8 @@ import ( "github.com/aws/aws-sdk-go-v2/service/secretsmanager" goformation "github.com/awslabs/goformation/v7/cloudformation" cf "github.com/awslabs/goformation/v7/cloudformation/cloudformation" - "github.com/awslabs/goformation/v7/cloudformation/rds" "github.com/awslabs/goformation/v7/cloudformation/tags" + "github.com/puzpuzpuz/xsync/v3" "golang.org/x/text/cases" "golang.org/x/text/language" @@ -40,6 +39,8 @@ type CloudformationProvisioner struct { client *cloudformation.Client secrets *secretsmanager.Client confg *Config + + running *xsync.MapOf[string, *task] } var _ provisionerconnect.ProvisionerPluginServiceHandler = (*CloudformationProvisioner)(nil) @@ -66,24 +67,25 @@ func (c *CloudformationProvisioner) Provision(ctx context.Context, req *connect. if err != nil { return nil, err } + token := *res.StackId + changeSetID := *res.Id + if !updated { return connect.NewResponse(&provisioner.ProvisionResponse{ // even if there are no changes, return the stack id so that any resource outputs can be populated Status: provisioner.ProvisionResponse_SUBMITTED, - ProvisioningToken: *res.StackId, + ProvisioningToken: token, }), nil } - _, err = c.client.ExecuteChangeSet(ctx, &cloudformation.ExecuteChangeSetInput{ - ChangeSetName: res.Id, - StackName: res.StackId, - }) - if err != nil { - return nil, fmt.Errorf("failed to execute change-set: %w", err) - } + task := &task{stackID: token} + if _, ok := c.running.LoadOrStore(token, task); ok { + return nil, fmt.Errorf("provisioner already running: %s", token) + } + task.Start(ctx, c.client, c.secrets, changeSetID) return connect.NewResponse(&provisioner.ProvisionResponse{ Status: provisioner.ProvisionResponse_SUBMITTED, - ProvisioningToken: *res.StackId, + ProvisioningToken: token, }), nil } @@ -124,8 +126,18 @@ func generateChangeSetName(stack string) string { func (c *CloudformationProvisioner) createTemplate(req *provisioner.ProvisionRequest) (string, error) { template := goformation.NewTemplate() for _, resourceCtx := range req.DesiredResources { - if err := c.resourceToCF(req.FtlClusterId, req.Module, template, resourceCtx.Resource); err != nil { - return "", err + var templater ResourceTemplater + if _, ok := resourceCtx.Resource.Resource.(*provisioner.Resource_Postgres); ok { + templater = &PostgresTemplater{ + resourceID: resourceCtx.Resource.ResourceId, + cluster: req.FtlClusterId, + module: req.Module, + config: c.confg, + } + } + + if err := templater.AddToTemplate(template); err != nil { + return "", fmt.Errorf("failed to add resource to template: %w", err) } } // Stack can not be empty, insert a null resource to keep the stack around @@ -140,45 +152,9 @@ func (c *CloudformationProvisioner) createTemplate(req *provisioner.ProvisionReq return string(bytes), nil } -func (c *CloudformationProvisioner) resourceToCF(cluster, module string, template *goformation.Template, resource *provisioner.Resource) error { - if _, ok := resource.Resource.(*provisioner.Resource_Postgres); ok { - clusterID := cloudformationResourceID(resource.ResourceId, "cluster") - instanceID := cloudformationResourceID(resource.ResourceId, "instance") - template.Resources[clusterID] = &rds.DBCluster{ - Engine: ptr("aurora-postgresql"), - MasterUsername: ptr("root"), - ManageMasterUserPassword: ptr(true), - DBSubnetGroupName: ptr(c.confg.DatabaseSubnetGroupARN), - VpcSecurityGroupIds: []string{c.confg.DatabaseSecurityGroup}, - EngineMode: ptr("provisioned"), - Port: ptr(5432), - ServerlessV2ScalingConfiguration: &rds.DBCluster_ServerlessV2ScalingConfiguration{ - MinCapacity: ptr(0.5), - MaxCapacity: ptr(10.0), - }, - Tags: ftlTags(cluster, module), - } - template.Resources[instanceID] = &rds.DBInstance{ - Engine: ptr("aurora-postgresql"), - DBInstanceClass: ptr("db.serverless"), - DBClusterIdentifier: ptr(goformation.Ref(clusterID)), - Tags: ftlTags(cluster, module), - } - addOutput(template.Outputs, goformation.GetAtt(clusterID, "Endpoint.Address"), &CloudformationOutputKey{ - ResourceID: resource.ResourceId, - PropertyName: PropertyDBWriteEndpoint, - }) - addOutput(template.Outputs, goformation.GetAtt(clusterID, "ReadEndpoint.Address"), &CloudformationOutputKey{ - ResourceID: resource.ResourceId, - PropertyName: PropertyDBReadEndpoint, - }) - addOutput(template.Outputs, goformation.GetAtt(clusterID, "MasterUserSecret.SecretArn"), &CloudformationOutputKey{ - ResourceID: resource.ResourceId, - PropertyName: PropertyMasterUserARN, - }) - return nil - } - return errors.New("unsupported resource type") +// ResourceTemplater interface for different resource types +type ResourceTemplater interface { + AddToTemplate(tmpl *goformation.Template) error } func ftlTags(cluster, module string) []tags.Tag { diff --git a/cmd/ftl-provisioner-cloudformation/status.go b/cmd/ftl-provisioner-cloudformation/status.go index 8242f0f43..ee69e4417 100644 --- a/cmd/ftl-provisioner-cloudformation/status.go +++ b/cmd/ftl-provisioner-cloudformation/status.go @@ -4,13 +4,11 @@ import ( "context" "database/sql" "encoding/json" - "errors" "fmt" "net/url" "strings" "connectrpc.com/connect" - "github.com/aws/aws-sdk-go-v2/service/cloudformation" "github.com/aws/aws-sdk-go-v2/service/cloudformation/types" "github.com/aws/aws-sdk-go-v2/service/secretsmanager" _ "github.com/jackc/pgx/v5/stdlib" // SQL driver @@ -19,68 +17,32 @@ import ( ) func (c *CloudformationProvisioner) Status(ctx context.Context, req *connect.Request[provisioner.StatusRequest]) (*connect.Response[provisioner.StatusResponse], error) { - client, err := createClient(ctx) - if err != nil { - return nil, fmt.Errorf("failed to create cloudformation client: %w", err) + token := req.Msg.ProvisioningToken + // if the task is not in the map, it means that the provisioner has crashed since starting the task + // in that case, we start a new task to query the existing stack + task, _ := c.running.LoadOrStore(token, &task{stackID: token}) + + if task.err.Load() != nil { + c.running.Delete(token) + return nil, connect.NewError(connect.CodeUnknown, task.err.Load()) } - desc, err := client.DescribeStacks(ctx, &cloudformation.DescribeStacksInput{ - StackName: &req.Msg.ProvisioningToken, - }) - if err != nil { - return nil, fmt.Errorf("failed to describe stack: %w", err) - } - stack := desc.Stacks[0] - - switch stack.StackStatus { - case types.StackStatusCreateInProgress: - return running() - case types.StackStatusCreateFailed: - return failure(&stack) - case types.StackStatusCreateComplete: - return c.success(ctx, &stack, req.Msg.DesiredResources) - case types.StackStatusRollbackInProgress: - return failure(&stack) - case types.StackStatusRollbackFailed: - return failure(&stack) - case types.StackStatusRollbackComplete: - return failure(&stack) - case types.StackStatusDeleteInProgress: - return running() - case types.StackStatusDeleteFailed: - return failure(&stack) - case types.StackStatusDeleteComplete: - return c.success(ctx, &stack, req.Msg.DesiredResources) - case types.StackStatusUpdateInProgress: - return running() - case types.StackStatusUpdateCompleteCleanupInProgress: - return running() - case types.StackStatusUpdateComplete: - return c.success(ctx, &stack, req.Msg.DesiredResources) - case types.StackStatusUpdateFailed: - return failure(&stack) - case types.StackStatusUpdateRollbackInProgress: - return running() - default: - return nil, errors.New("unsupported Cloudformation status code: " + string(desc.Stacks[0].StackStatus)) - } -} + if task.outputs.Load() != nil { + c.running.Delete(token) -func (c *CloudformationProvisioner) success(ctx context.Context, stack *types.Stack, resources []*provisioner.Resource) (*connect.Response[provisioner.StatusResponse], error) { - err := c.updateResources(ctx, stack.Outputs, resources) - if err != nil { - return nil, err - } - return connect.NewResponse(&provisioner.StatusResponse{ - Status: &provisioner.StatusResponse_Success{ - Success: &provisioner.StatusResponse_ProvisioningSuccess{ - UpdatedResources: resources, + resources := req.Msg.DesiredResources + if err := c.updateResources(ctx, task.outputs.Load(), resources); err != nil { + return nil, err + } + return connect.NewResponse(&provisioner.StatusResponse{ + Status: &provisioner.StatusResponse_Success{ + Success: &provisioner.StatusResponse_ProvisioningSuccess{ + UpdatedResources: resources, + }, }, - }, - }), nil -} + }), nil + } -func running() (*connect.Response[provisioner.StatusResponse], error) { return connect.NewResponse(&provisioner.StatusResponse{ Status: &provisioner.StatusResponse_Running{ Running: &provisioner.StatusResponse_ProvisioningRunning{}, @@ -88,10 +50,6 @@ func running() (*connect.Response[provisioner.StatusResponse], error) { }), nil } -func failure(stack *types.Stack) (*connect.Response[provisioner.StatusResponse], error) { - return nil, connect.NewError(connect.CodeUnknown, errors.New(*stack.StackStatusReason)) -} - func outputsByResourceID(outputs []types.Output) (map[string][]types.Output, error) { m := make(map[string][]types.Output) for _, output := range outputs { diff --git a/cmd/ftl-provisioner-cloudformation/task.go b/cmd/ftl-provisioner-cloudformation/task.go new file mode 100644 index 000000000..481f7bef3 --- /dev/null +++ b/cmd/ftl-provisioner-cloudformation/task.go @@ -0,0 +1,100 @@ +package main + +import ( + "context" + "fmt" + "time" + + "github.com/alecthomas/atomic" + + "github.com/aws/aws-sdk-go-v2/service/cloudformation" + "github.com/aws/aws-sdk-go-v2/service/cloudformation/types" + "github.com/aws/aws-sdk-go-v2/service/secretsmanager" + "github.com/jpillora/backoff" +) + +type task struct { + stackID string + + err atomic.Value[error] + outputs atomic.Value[[]types.Output] +} + +func (t *task) updateStack(ctx context.Context, client *cloudformation.Client, changeSetID string) ([]types.Output, error) { + _, err := client.ExecuteChangeSet(ctx, &cloudformation.ExecuteChangeSetInput{ + ChangeSetName: &changeSetID, + StackName: &t.stackID, + }) + if err != nil { + return nil, fmt.Errorf("failed to execute change-set: %w", err) + } + + retry := backoff.Backoff{ + Min: 100 * time.Millisecond, + Max: 5 * time.Second, + Factor: 2, + } + for { + desc, err := client.DescribeStacks(ctx, &cloudformation.DescribeStacksInput{ + StackName: &t.stackID, + }) + if err != nil { + return nil, fmt.Errorf("failed to describe stack: %w", err) + } + stack := desc.Stacks[0] + + switch stack.StackStatus { + // noop while running + case types.StackStatusCreateInProgress: + case types.StackStatusUpdateInProgress: + case types.StackStatusUpdateCompleteCleanupInProgress: + case types.StackStatusUpdateRollbackInProgress: + + // success + case types.StackStatusCreateComplete: + return stack.Outputs, nil + case types.StackStatusDeleteComplete: + return stack.Outputs, nil + case types.StackStatusUpdateComplete: + return stack.Outputs, nil + + // failures + case types.StackStatusCreateFailed: + return nil, fmt.Errorf("stack creation failed: %s", *stack.StackStatusReason) + case types.StackStatusRollbackInProgress: + return nil, fmt.Errorf("stack rollback in progress: %s", *stack.StackStatusReason) + case types.StackStatusRollbackFailed: + return nil, fmt.Errorf("stack rollback failed: %s", *stack.StackStatusReason) + case types.StackStatusRollbackComplete: + return nil, fmt.Errorf("stack rollback complete: %s", *stack.StackStatusReason) + case types.StackStatusDeleteInProgress: + case types.StackStatusDeleteFailed: + return nil, fmt.Errorf("stack deletion failed: %s", *stack.StackStatusReason) + case types.StackStatusUpdateFailed: + return nil, fmt.Errorf("stack update failed: %s", *stack.StackStatusReason) + default: + return nil, fmt.Errorf("unsupported Cloudformation status code: %s", string(desc.Stacks[0].StackStatus)) + } + + time.Sleep(retry.Duration()) + } +} + +func (t *task) postUpdate(ctx context.Context, client *cloudformation.Client, secrets *secretsmanager.Client, outputs []types.Output) error { + return nil +} + +func (t *task) Start(ctx context.Context, client *cloudformation.Client, secrets *secretsmanager.Client, changeSetID string) { + go func() { + outputs, err := t.updateStack(ctx, client, changeSetID) + if err != nil { + t.err.Store(err) + return + } + if err := t.postUpdate(ctx, client, secrets, outputs); err != nil { + t.err.Store(err) + return + } + t.outputs.Store(outputs) + }() +} diff --git a/go.sum b/go.sum index f118264fb..df8450b14 100644 --- a/go.sum +++ b/go.sum @@ -392,6 +392,8 @@ go.opentelemetry.io/otel/trace v1.32.0 h1:WIC9mYrXf8TmY/EXuULKc8hR17vE+Hjv2cssQD go.opentelemetry.io/otel/trace v1.32.0/go.mod h1:+i4rkvCraA+tG6AzwloGaCtkx53Fa+L+V8e9a7YvhT8= go.opentelemetry.io/proto/otlp v1.3.1 h1:TrMUixzpM0yuc/znrFTP9MMRh8trP93mkCiDVeXrui0= go.opentelemetry.io/proto/otlp v1.3.1/go.mod h1:0X1WI4de4ZsLrrJNLAQbFeLCm3T7yBkR0XqQ7niQU+8= +go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= +go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs= go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=