Skip to content

Commit

Permalink
chore: move postgres provisioning to a separate file
Browse files Browse the repository at this point in the history
  • Loading branch information
jvmakine committed Nov 13, 2024
1 parent ce90215 commit 8d0c162
Show file tree
Hide file tree
Showing 5 changed files with 204 additions and 115 deletions.
53 changes: 53 additions & 0 deletions cmd/ftl-provisioner-cloudformation/postgres.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package main

import (
goformation "github.com/awslabs/goformation/v7/cloudformation"
"github.com/awslabs/goformation/v7/cloudformation/rds"
)

type PostgresTemplater struct {
resourceID string
cluster string
module string
config *Config
}

var _ ResourceTemplater = (*PostgresTemplater)(nil)

func (p *PostgresTemplater) AddToTemplate(template *goformation.Template) error {
clusterID := cloudformationResourceID(p.resourceID, "cluster")
instanceID := cloudformationResourceID(p.resourceID, "instance")
template.Resources[clusterID] = &rds.DBCluster{
Engine: ptr("aurora-postgresql"),
MasterUsername: ptr("root"),
ManageMasterUserPassword: ptr(true),
DBSubnetGroupName: ptr(p.config.DatabaseSubnetGroupARN),
VpcSecurityGroupIds: []string{p.config.DatabaseSecurityGroup},
EngineMode: ptr("provisioned"),
Port: ptr(5432),
ServerlessV2ScalingConfiguration: &rds.DBCluster_ServerlessV2ScalingConfiguration{
MinCapacity: ptr(0.5),
MaxCapacity: ptr(10.0),
},
Tags: ftlTags(p.cluster, p.module),
}
template.Resources[instanceID] = &rds.DBInstance{
Engine: ptr("aurora-postgresql"),
DBInstanceClass: ptr("db.serverless"),
DBClusterIdentifier: ptr(goformation.Ref(clusterID)),
Tags: ftlTags(p.cluster, p.module),
}
addOutput(template.Outputs, goformation.GetAtt(clusterID, "Endpoint.Address"), &CloudformationOutputKey{
ResourceID: p.resourceID,
PropertyName: PropertyDBWriteEndpoint,
})
addOutput(template.Outputs, goformation.GetAtt(clusterID, "ReadEndpoint.Address"), &CloudformationOutputKey{
ResourceID: p.resourceID,
PropertyName: PropertyDBReadEndpoint,
})
addOutput(template.Outputs, goformation.GetAtt(clusterID, "MasterUserSecret.SecretArn"), &CloudformationOutputKey{
ResourceID: p.resourceID,
PropertyName: PropertyMasterUserARN,
})
return nil
}
80 changes: 28 additions & 52 deletions cmd/ftl-provisioner-cloudformation/provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package main
import (
"bytes"
"context"
"errors"
"fmt"
"strconv"
"time"
Expand All @@ -13,8 +12,8 @@ import (
"github.com/aws/aws-sdk-go-v2/service/secretsmanager"
goformation "github.com/awslabs/goformation/v7/cloudformation"
cf "github.com/awslabs/goformation/v7/cloudformation/cloudformation"
"github.com/awslabs/goformation/v7/cloudformation/rds"
"github.com/awslabs/goformation/v7/cloudformation/tags"
"github.com/puzpuzpuz/xsync/v3"
"golang.org/x/text/cases"
"golang.org/x/text/language"

Expand All @@ -40,6 +39,8 @@ type CloudformationProvisioner struct {
client *cloudformation.Client
secrets *secretsmanager.Client
confg *Config

running *xsync.MapOf[string, *task]
}

var _ provisionerconnect.ProvisionerPluginServiceHandler = (*CloudformationProvisioner)(nil)
Expand All @@ -66,24 +67,25 @@ func (c *CloudformationProvisioner) Provision(ctx context.Context, req *connect.
if err != nil {
return nil, err
}
token := *res.StackId
changeSetID := *res.Id

if !updated {
return connect.NewResponse(&provisioner.ProvisionResponse{
// even if there are no changes, return the stack id so that any resource outputs can be populated
Status: provisioner.ProvisionResponse_SUBMITTED,
ProvisioningToken: *res.StackId,
ProvisioningToken: token,
}), nil
}
_, err = c.client.ExecuteChangeSet(ctx, &cloudformation.ExecuteChangeSetInput{
ChangeSetName: res.Id,
StackName: res.StackId,
})
if err != nil {
return nil, fmt.Errorf("failed to execute change-set: %w", err)
}

task := &task{stackID: token}
if _, ok := c.running.LoadOrStore(token, task); ok {
return nil, fmt.Errorf("provisioner already running: %s", token)
}
task.Start(ctx, c.client, c.secrets, changeSetID)
return connect.NewResponse(&provisioner.ProvisionResponse{
Status: provisioner.ProvisionResponse_SUBMITTED,
ProvisioningToken: *res.StackId,
ProvisioningToken: token,
}), nil
}

Expand Down Expand Up @@ -124,8 +126,18 @@ func generateChangeSetName(stack string) string {
func (c *CloudformationProvisioner) createTemplate(req *provisioner.ProvisionRequest) (string, error) {
template := goformation.NewTemplate()
for _, resourceCtx := range req.DesiredResources {
if err := c.resourceToCF(req.FtlClusterId, req.Module, template, resourceCtx.Resource); err != nil {
return "", err
var templater ResourceTemplater
if _, ok := resourceCtx.Resource.Resource.(*provisioner.Resource_Postgres); ok {
templater = &PostgresTemplater{
resourceID: resourceCtx.Resource.ResourceId,
cluster: req.FtlClusterId,
module: req.Module,
config: c.confg,
}
}

if err := templater.AddToTemplate(template); err != nil {
return "", fmt.Errorf("failed to add resource to template: %w", err)
}
}
// Stack can not be empty, insert a null resource to keep the stack around
Expand All @@ -140,45 +152,9 @@ func (c *CloudformationProvisioner) createTemplate(req *provisioner.ProvisionReq
return string(bytes), nil
}

func (c *CloudformationProvisioner) resourceToCF(cluster, module string, template *goformation.Template, resource *provisioner.Resource) error {
if _, ok := resource.Resource.(*provisioner.Resource_Postgres); ok {
clusterID := cloudformationResourceID(resource.ResourceId, "cluster")
instanceID := cloudformationResourceID(resource.ResourceId, "instance")
template.Resources[clusterID] = &rds.DBCluster{
Engine: ptr("aurora-postgresql"),
MasterUsername: ptr("root"),
ManageMasterUserPassword: ptr(true),
DBSubnetGroupName: ptr(c.confg.DatabaseSubnetGroupARN),
VpcSecurityGroupIds: []string{c.confg.DatabaseSecurityGroup},
EngineMode: ptr("provisioned"),
Port: ptr(5432),
ServerlessV2ScalingConfiguration: &rds.DBCluster_ServerlessV2ScalingConfiguration{
MinCapacity: ptr(0.5),
MaxCapacity: ptr(10.0),
},
Tags: ftlTags(cluster, module),
}
template.Resources[instanceID] = &rds.DBInstance{
Engine: ptr("aurora-postgresql"),
DBInstanceClass: ptr("db.serverless"),
DBClusterIdentifier: ptr(goformation.Ref(clusterID)),
Tags: ftlTags(cluster, module),
}
addOutput(template.Outputs, goformation.GetAtt(clusterID, "Endpoint.Address"), &CloudformationOutputKey{
ResourceID: resource.ResourceId,
PropertyName: PropertyDBWriteEndpoint,
})
addOutput(template.Outputs, goformation.GetAtt(clusterID, "ReadEndpoint.Address"), &CloudformationOutputKey{
ResourceID: resource.ResourceId,
PropertyName: PropertyDBReadEndpoint,
})
addOutput(template.Outputs, goformation.GetAtt(clusterID, "MasterUserSecret.SecretArn"), &CloudformationOutputKey{
ResourceID: resource.ResourceId,
PropertyName: PropertyMasterUserARN,
})
return nil
}
return errors.New("unsupported resource type")
// ResourceTemplater interface for different resource types
type ResourceTemplater interface {
AddToTemplate(tmpl *goformation.Template) error
}

func ftlTags(cluster, module string) []tags.Tag {
Expand Down
84 changes: 21 additions & 63 deletions cmd/ftl-provisioner-cloudformation/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,11 @@ import (
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
"net/url"
"strings"

"connectrpc.com/connect"
"github.com/aws/aws-sdk-go-v2/service/cloudformation"
"github.com/aws/aws-sdk-go-v2/service/cloudformation/types"
"github.com/aws/aws-sdk-go-v2/service/secretsmanager"
_ "github.com/jackc/pgx/v5/stdlib" // SQL driver
Expand All @@ -19,79 +17,39 @@ import (
)

func (c *CloudformationProvisioner) Status(ctx context.Context, req *connect.Request[provisioner.StatusRequest]) (*connect.Response[provisioner.StatusResponse], error) {
client, err := createClient(ctx)
if err != nil {
return nil, fmt.Errorf("failed to create cloudformation client: %w", err)
token := req.Msg.ProvisioningToken
// if the task is not in the map, it means that the provisioner has crashed since starting the task
// in that case, we start a new task to query the existing stack
task, _ := c.running.LoadOrStore(token, &task{stackID: token})

if task.err.Load() != nil {
c.running.Delete(token)
return nil, connect.NewError(connect.CodeUnknown, task.err.Load())
}

desc, err := client.DescribeStacks(ctx, &cloudformation.DescribeStacksInput{
StackName: &req.Msg.ProvisioningToken,
})
if err != nil {
return nil, fmt.Errorf("failed to describe stack: %w", err)
}
stack := desc.Stacks[0]

switch stack.StackStatus {
case types.StackStatusCreateInProgress:
return running()
case types.StackStatusCreateFailed:
return failure(&stack)
case types.StackStatusCreateComplete:
return c.success(ctx, &stack, req.Msg.DesiredResources)
case types.StackStatusRollbackInProgress:
return failure(&stack)
case types.StackStatusRollbackFailed:
return failure(&stack)
case types.StackStatusRollbackComplete:
return failure(&stack)
case types.StackStatusDeleteInProgress:
return running()
case types.StackStatusDeleteFailed:
return failure(&stack)
case types.StackStatusDeleteComplete:
return c.success(ctx, &stack, req.Msg.DesiredResources)
case types.StackStatusUpdateInProgress:
return running()
case types.StackStatusUpdateCompleteCleanupInProgress:
return running()
case types.StackStatusUpdateComplete:
return c.success(ctx, &stack, req.Msg.DesiredResources)
case types.StackStatusUpdateFailed:
return failure(&stack)
case types.StackStatusUpdateRollbackInProgress:
return running()
default:
return nil, errors.New("unsupported Cloudformation status code: " + string(desc.Stacks[0].StackStatus))
}
}
if task.outputs.Load() != nil {
c.running.Delete(token)

func (c *CloudformationProvisioner) success(ctx context.Context, stack *types.Stack, resources []*provisioner.Resource) (*connect.Response[provisioner.StatusResponse], error) {
err := c.updateResources(ctx, stack.Outputs, resources)
if err != nil {
return nil, err
}
return connect.NewResponse(&provisioner.StatusResponse{
Status: &provisioner.StatusResponse_Success{
Success: &provisioner.StatusResponse_ProvisioningSuccess{
UpdatedResources: resources,
resources := req.Msg.DesiredResources
if err := c.updateResources(ctx, task.outputs.Load(), resources); err != nil {
return nil, err
}
return connect.NewResponse(&provisioner.StatusResponse{
Status: &provisioner.StatusResponse_Success{
Success: &provisioner.StatusResponse_ProvisioningSuccess{
UpdatedResources: resources,
},
},
},
}), nil
}
}), nil
}

func running() (*connect.Response[provisioner.StatusResponse], error) {
return connect.NewResponse(&provisioner.StatusResponse{
Status: &provisioner.StatusResponse_Running{
Running: &provisioner.StatusResponse_ProvisioningRunning{},
},
}), nil
}

func failure(stack *types.Stack) (*connect.Response[provisioner.StatusResponse], error) {
return nil, connect.NewError(connect.CodeUnknown, errors.New(*stack.StackStatusReason))
}

func outputsByResourceID(outputs []types.Output) (map[string][]types.Output, error) {
m := make(map[string][]types.Output)
for _, output := range outputs {
Expand Down
Loading

0 comments on commit 8d0c162

Please sign in to comment.