Skip to content

Commit

Permalink
refactor: Juho/refactoring (#3377)
Browse files Browse the repository at this point in the history
Previously, we were `CREATE DATABASE ... ` as a response to a successful
status check. Now, we spawn a background goroutine to wait for the stack
to become ready and run any DDL commands before reporting the task as a
success.

Closes #3333

---------

Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
jvmakine and github-actions[bot] authored Nov 13, 2024
1 parent 3431255 commit 6b8f7f1
Show file tree
Hide file tree
Showing 4 changed files with 275 additions and 161 deletions.
53 changes: 53 additions & 0 deletions cmd/ftl-provisioner-cloudformation/postgres.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package main

import (
goformation "github.com/awslabs/goformation/v7/cloudformation"
"github.com/awslabs/goformation/v7/cloudformation/rds"
)

type PostgresTemplater struct {
resourceID string
cluster string
module string
config *Config
}

var _ ResourceTemplater = (*PostgresTemplater)(nil)

func (p *PostgresTemplater) AddToTemplate(template *goformation.Template) error {
clusterID := cloudformationResourceID(p.resourceID, "cluster")
instanceID := cloudformationResourceID(p.resourceID, "instance")
template.Resources[clusterID] = &rds.DBCluster{
Engine: ptr("aurora-postgresql"),
MasterUsername: ptr("root"),
ManageMasterUserPassword: ptr(true),
DBSubnetGroupName: ptr(p.config.DatabaseSubnetGroupARN),
VpcSecurityGroupIds: []string{p.config.DatabaseSecurityGroup},
EngineMode: ptr("provisioned"),
Port: ptr(5432),
ServerlessV2ScalingConfiguration: &rds.DBCluster_ServerlessV2ScalingConfiguration{
MinCapacity: ptr(0.5),
MaxCapacity: ptr(10.0),
},
Tags: ftlTags(p.cluster, p.module),
}
template.Resources[instanceID] = &rds.DBInstance{
Engine: ptr("aurora-postgresql"),
DBInstanceClass: ptr("db.serverless"),
DBClusterIdentifier: ptr(goformation.Ref(clusterID)),
Tags: ftlTags(p.cluster, p.module),
}
addOutput(template.Outputs, goformation.GetAtt(clusterID, "Endpoint.Address"), &CloudformationOutputKey{
ResourceID: p.resourceID,
PropertyName: PropertyPsqlWriteEndpoint,
})
addOutput(template.Outputs, goformation.GetAtt(clusterID, "ReadEndpoint.Address"), &CloudformationOutputKey{
ResourceID: p.resourceID,
PropertyName: PropertyPsqlReadEndpoint,
})
addOutput(template.Outputs, goformation.GetAtt(clusterID, "MasterUserSecret.SecretArn"), &CloudformationOutputKey{
ResourceID: p.resourceID,
PropertyName: PropertyPsqlMasterUserARN,
})
return nil
}
93 changes: 37 additions & 56 deletions cmd/ftl-provisioner-cloudformation/provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package main
import (
"bytes"
"context"
"errors"
"fmt"
"strconv"
"time"
Expand All @@ -13,8 +12,8 @@ import (
"github.com/aws/aws-sdk-go-v2/service/secretsmanager"
goformation "github.com/awslabs/goformation/v7/cloudformation"
cf "github.com/awslabs/goformation/v7/cloudformation/cloudformation"
"github.com/awslabs/goformation/v7/cloudformation/rds"
"github.com/awslabs/goformation/v7/cloudformation/tags"
"github.com/puzpuzpuz/xsync/v3"
"golang.org/x/text/cases"
"golang.org/x/text/language"

Expand All @@ -25,9 +24,9 @@ import (
)

const (
PropertyDBReadEndpoint = "db:read_endpoint"
PropertyDBWriteEndpoint = "db:write_endpoint"
PropertyMasterUserARN = "db:master_user_secret_arn"
PropertyPsqlReadEndpoint = "psql:read_endpoint"
PropertyPsqlWriteEndpoint = "psql:write_endpoint"
PropertyPsqlMasterUserARN = "psql:master_user_secret_arn"
)

type Config struct {
Expand All @@ -40,6 +39,8 @@ type CloudformationProvisioner struct {
client *cloudformation.Client
secrets *secretsmanager.Client
confg *Config

running *xsync.MapOf[string, *task]
}

var _ provisionerconnect.ProvisionerPluginServiceHandler = (*CloudformationProvisioner)(nil)
Expand All @@ -54,7 +55,12 @@ func NewCloudformationProvisioner(ctx context.Context, config Config) (context.C
return nil, nil, fmt.Errorf("failed to create secretsmanager client: %w", err)
}

return ctx, &CloudformationProvisioner{client: client, secrets: secrets, confg: &config}, nil
return ctx, &CloudformationProvisioner{
client: client,
secrets: secrets,
confg: &config,
running: xsync.NewMapOf[string, *task](),
}, nil
}

func (c *CloudformationProvisioner) Ping(context.Context, *connect.Request[ftlv1.PingRequest]) (*connect.Response[ftlv1.PingResponse], error) {
Expand All @@ -66,24 +72,25 @@ func (c *CloudformationProvisioner) Provision(ctx context.Context, req *connect.
if err != nil {
return nil, err
}
token := *res.StackId
changeSetID := *res.Id

if !updated {
return connect.NewResponse(&provisioner.ProvisionResponse{
// even if there are no changes, return the stack id so that any resource outputs can be populated
Status: provisioner.ProvisionResponse_SUBMITTED,
ProvisioningToken: *res.StackId,
ProvisioningToken: token,
}), nil
}
_, err = c.client.ExecuteChangeSet(ctx, &cloudformation.ExecuteChangeSetInput{
ChangeSetName: res.Id,
StackName: res.StackId,
})
if err != nil {
return nil, fmt.Errorf("failed to execute change-set: %w", err)
}

task := &task{stackID: token}
if _, ok := c.running.LoadOrStore(token, task); ok {
return nil, fmt.Errorf("provisioner already running: %s", token)
}
task.Start(ctx, c.client, c.secrets, changeSetID)
return connect.NewResponse(&provisioner.ProvisionResponse{
Status: provisioner.ProvisionResponse_SUBMITTED,
ProvisioningToken: *res.StackId,
ProvisioningToken: token,
}), nil
}

Expand Down Expand Up @@ -124,8 +131,18 @@ func generateChangeSetName(stack string) string {
func (c *CloudformationProvisioner) createTemplate(req *provisioner.ProvisionRequest) (string, error) {
template := goformation.NewTemplate()
for _, resourceCtx := range req.DesiredResources {
if err := c.resourceToCF(req.FtlClusterId, req.Module, template, resourceCtx.Resource); err != nil {
return "", err
var templater ResourceTemplater
if _, ok := resourceCtx.Resource.Resource.(*provisioner.Resource_Postgres); ok {
templater = &PostgresTemplater{
resourceID: resourceCtx.Resource.ResourceId,
cluster: req.FtlClusterId,
module: req.Module,
config: c.confg,
}
}

if err := templater.AddToTemplate(template); err != nil {
return "", fmt.Errorf("failed to add resource to template: %w", err)
}
}
// Stack can not be empty, insert a null resource to keep the stack around
Expand All @@ -140,45 +157,9 @@ func (c *CloudformationProvisioner) createTemplate(req *provisioner.ProvisionReq
return string(bytes), nil
}

func (c *CloudformationProvisioner) resourceToCF(cluster, module string, template *goformation.Template, resource *provisioner.Resource) error {
if _, ok := resource.Resource.(*provisioner.Resource_Postgres); ok {
clusterID := cloudformationResourceID(resource.ResourceId, "cluster")
instanceID := cloudformationResourceID(resource.ResourceId, "instance")
template.Resources[clusterID] = &rds.DBCluster{
Engine: ptr("aurora-postgresql"),
MasterUsername: ptr("root"),
ManageMasterUserPassword: ptr(true),
DBSubnetGroupName: ptr(c.confg.DatabaseSubnetGroupARN),
VpcSecurityGroupIds: []string{c.confg.DatabaseSecurityGroup},
EngineMode: ptr("provisioned"),
Port: ptr(5432),
ServerlessV2ScalingConfiguration: &rds.DBCluster_ServerlessV2ScalingConfiguration{
MinCapacity: ptr(0.5),
MaxCapacity: ptr(10.0),
},
Tags: ftlTags(cluster, module),
}
template.Resources[instanceID] = &rds.DBInstance{
Engine: ptr("aurora-postgresql"),
DBInstanceClass: ptr("db.serverless"),
DBClusterIdentifier: ptr(goformation.Ref(clusterID)),
Tags: ftlTags(cluster, module),
}
addOutput(template.Outputs, goformation.GetAtt(clusterID, "Endpoint.Address"), &CloudformationOutputKey{
ResourceID: resource.ResourceId,
PropertyName: PropertyDBWriteEndpoint,
})
addOutput(template.Outputs, goformation.GetAtt(clusterID, "ReadEndpoint.Address"), &CloudformationOutputKey{
ResourceID: resource.ResourceId,
PropertyName: PropertyDBReadEndpoint,
})
addOutput(template.Outputs, goformation.GetAtt(clusterID, "MasterUserSecret.SecretArn"), &CloudformationOutputKey{
ResourceID: resource.ResourceId,
PropertyName: PropertyMasterUserARN,
})
return nil
}
return errors.New("unsupported resource type")
// ResourceTemplater interface for different resource types
type ResourceTemplater interface {
AddToTemplate(tmpl *goformation.Template) error
}

func ftlTags(cluster, module string) []tags.Tag {
Expand Down
131 changes: 26 additions & 105 deletions cmd/ftl-provisioner-cloudformation/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,96 +2,50 @@ package main

import (
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
"net/url"
"strings"

"connectrpc.com/connect"
"github.com/aws/aws-sdk-go-v2/service/cloudformation"
"github.com/aws/aws-sdk-go-v2/service/cloudformation/types"
"github.com/aws/aws-sdk-go-v2/service/secretsmanager"
_ "github.com/jackc/pgx/v5/stdlib" // SQL driver

"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1beta1/provisioner"
)

func (c *CloudformationProvisioner) Status(ctx context.Context, req *connect.Request[provisioner.StatusRequest]) (*connect.Response[provisioner.StatusResponse], error) {
client, err := createClient(ctx)
if err != nil {
return nil, fmt.Errorf("failed to create cloudformation client: %w", err)
token := req.Msg.ProvisioningToken
// if the task is not in the map, it means that the provisioner has crashed since starting the task
// in that case, we start a new task to query the existing stack
task, _ := c.running.LoadOrStore(token, &task{stackID: token})

if task.err.Load() != nil {
c.running.Delete(token)
return nil, connect.NewError(connect.CodeUnknown, task.err.Load())
}

desc, err := client.DescribeStacks(ctx, &cloudformation.DescribeStacksInput{
StackName: &req.Msg.ProvisioningToken,
})
if err != nil {
return nil, fmt.Errorf("failed to describe stack: %w", err)
}
stack := desc.Stacks[0]

switch stack.StackStatus {
case types.StackStatusCreateInProgress:
return running()
case types.StackStatusCreateFailed:
return failure(&stack)
case types.StackStatusCreateComplete:
return c.success(ctx, &stack, req.Msg.DesiredResources)
case types.StackStatusRollbackInProgress:
return failure(&stack)
case types.StackStatusRollbackFailed:
return failure(&stack)
case types.StackStatusRollbackComplete:
return failure(&stack)
case types.StackStatusDeleteInProgress:
return running()
case types.StackStatusDeleteFailed:
return failure(&stack)
case types.StackStatusDeleteComplete:
return c.success(ctx, &stack, req.Msg.DesiredResources)
case types.StackStatusUpdateInProgress:
return running()
case types.StackStatusUpdateCompleteCleanupInProgress:
return running()
case types.StackStatusUpdateComplete:
return c.success(ctx, &stack, req.Msg.DesiredResources)
case types.StackStatusUpdateFailed:
return failure(&stack)
case types.StackStatusUpdateRollbackInProgress:
return running()
default:
return nil, errors.New("unsupported Cloudformation status code: " + string(desc.Stacks[0].StackStatus))
}
}
if task.outputs.Load() != nil {
c.running.Delete(token)

func (c *CloudformationProvisioner) success(ctx context.Context, stack *types.Stack, resources []*provisioner.Resource) (*connect.Response[provisioner.StatusResponse], error) {
err := c.updateResources(ctx, stack.Outputs, resources)
if err != nil {
return nil, err
}
return connect.NewResponse(&provisioner.StatusResponse{
Status: &provisioner.StatusResponse_Success{
Success: &provisioner.StatusResponse_ProvisioningSuccess{
UpdatedResources: resources,
resources := req.Msg.DesiredResources
if err := c.updateResources(ctx, task.outputs.Load(), resources); err != nil {
return nil, err
}
return connect.NewResponse(&provisioner.StatusResponse{
Status: &provisioner.StatusResponse_Success{
Success: &provisioner.StatusResponse_ProvisioningSuccess{
UpdatedResources: resources,
},
},
},
}), nil
}
}), nil
}

func running() (*connect.Response[provisioner.StatusResponse], error) {
return connect.NewResponse(&provisioner.StatusResponse{
Status: &provisioner.StatusResponse_Running{
Running: &provisioner.StatusResponse_ProvisioningRunning{},
},
}), nil
}

func failure(stack *types.Stack) (*connect.Response[provisioner.StatusResponse], error) {
return nil, connect.NewError(connect.CodeUnknown, errors.New(*stack.StackStatusReason))
}

func outputsByResourceID(outputs []types.Output) (map[string][]types.Output, error) {
m := make(map[string][]types.Output)
for _, output := range outputs {
Expand Down Expand Up @@ -147,31 +101,15 @@ func (c *CloudformationProvisioner) updatePostgresOutputs(ctx context.Context, t
return fmt.Errorf("failed to group outputs by property name: %w", err)
}

// TODO: Move to provisioner workflow
secretARN := *byName[PropertyMasterUserARN].OutputValue
username, password, err := c.secretARNToUsernamePassword(ctx, secretARN)
// TODO: mind the secret rotation
secretARN := *byName[PropertyPsqlMasterUserARN].OutputValue
username, password, err := secretARNToUsernamePassword(ctx, c.secrets, secretARN)
if err != nil {
return fmt.Errorf("failed to get username and password from secret ARN: %w", err)
}

to.ReadDsn = endpointToDSN(byName[PropertyDBReadEndpoint].OutputValue, resourceID, 5432, username, password)
to.WriteDsn = endpointToDSN(byName[PropertyDBWriteEndpoint].OutputValue, resourceID, 5432, username, password)
adminEndpoint := endpointToDSN(byName[PropertyDBReadEndpoint].OutputValue, "postgres", 5432, username, password)

// Connect to postgres without a specific database to create the new one
db, err := sql.Open("pgx", adminEndpoint)
if err != nil {
return fmt.Errorf("failed to connect to postgres: %w", err)
}
defer db.Close()

// Create the database if it doesn't exist
if _, err := db.ExecContext(ctx, "CREATE DATABASE "+resourceID); err != nil {
// Ignore if database already exists
if !strings.Contains(err.Error(), "already exists") {
return fmt.Errorf("failed to create database: %w", err)
}
}
to.ReadDsn = endpointToDSN(byName[PropertyPsqlReadEndpoint].OutputValue, resourceID, 5432, username, password)
to.WriteDsn = endpointToDSN(byName[PropertyPsqlWriteEndpoint].OutputValue, resourceID, 5432, username, password)

return nil
}
Expand All @@ -190,20 +128,3 @@ func endpointToDSN(endpoint *string, database string, port int, username, passwo

return url.String()
}

func (c *CloudformationProvisioner) secretARNToUsernamePassword(ctx context.Context, secretARN string) (string, string, error) {
secret, err := c.secrets.GetSecretValue(ctx, &secretsmanager.GetSecretValueInput{
SecretId: &secretARN,
})
if err != nil {
return "", "", fmt.Errorf("failed to get secret value: %w", err)
}
secretString := *secret.SecretString

var secretData map[string]string
if err := json.Unmarshal([]byte(secretString), &secretData); err != nil {
return "", "", fmt.Errorf("failed to unmarshal secret data: %w", err)
}

return secretData["username"], secretData["password"], nil
}
Loading

0 comments on commit 6b8f7f1

Please sign in to comment.