Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: revert topic and subscription case and KMS key changes #2352

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ jobs:
test-scripts:
name: Test Scripts
runs-on: ubuntu-latest
if: false
steps:
- name: Checkout code
uses: actions/checkout@v4
Expand Down Expand Up @@ -66,6 +67,7 @@ jobs:
ensure-frozen-migrations:
name: Ensure Frozen Migrations
runs-on: ubuntu-latest
if: false
steps:
- name: Checkout code
uses: actions/checkout@v4
Expand All @@ -78,6 +80,7 @@ jobs:
lint:
name: Lint
runs-on: ubuntu-latest
if: false
steps:
- name: Checkout code
uses: actions/checkout@v4
Expand Down
50 changes: 44 additions & 6 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import (
cf "github.com/TBD54566975/ftl/common/configuration"
frontend "github.com/TBD54566975/ftl/frontend"
"github.com/TBD54566975/ftl/internal/cors"
"github.com/TBD54566975/ftl/internal/encryption"
ftlhttp "github.com/TBD54566975/ftl/internal/http"
"github.com/TBD54566975/ftl/internal/log"
ftlmaps "github.com/TBD54566975/ftl/internal/maps"
Expand Down Expand Up @@ -84,6 +85,42 @@ func (c *CommonConfig) Validate() error {
return nil
}

// EncryptionKeys for the controller config.
// Deprecated: Will remove this at some stage.
type EncryptionKeys struct {
Logs string `name:"log-key" help:"Key for sensitive log data in internal FTL tables." env:"FTL_LOG_ENCRYPTION_KEY"`
Async string `name:"async-key" help:"Key for sensitive async call data in internal FTL tables." env:"FTL_ASYNC_ENCRYPTION_KEY"`
}

func (e EncryptionKeys) Encryptors(required bool) (*dal.Encryptors, error) {
encryptors := dal.Encryptors{}
if e.Logs != "" {
enc, err := encryption.NewForKeyOrURI(e.Logs)
if err != nil {
return nil, fmt.Errorf("could not create log encryptor: %w", err)
}
encryptors.Logs = enc
} else if required {
return nil, fmt.Errorf("FTL_LOG_ENCRYPTION_KEY is required")
} else {
encryptors.Logs = encryption.NoOpEncryptor{}
}

if e.Async != "" {
enc, err := encryption.NewForKeyOrURI(e.Async)
if err != nil {
return nil, fmt.Errorf("could not create async calls encryptor: %w", err)
}
encryptors.Async = enc
} else if required {
return nil, fmt.Errorf("FTL_ASYNC_ENCRYPTION_KEY is required")
} else {
encryptors.Async = encryption.NoOpEncryptor{}
}

return &encryptors, nil
}

type Config struct {
Bind *url.URL `help:"Socket to bind to." default:"http://127.0.0.1:8892" env:"FTL_CONTROLLER_BIND"`
IngressBind *url.URL `help:"Socket to bind to for ingress." default:"http://127.0.0.1:8891" env:"FTL_CONTROLLER_INGRESS_BIND"`
Expand All @@ -98,7 +135,8 @@ type Config struct {
ModuleUpdateFrequency time.Duration `help:"Frequency to send module updates." default:"30s"`
EventLogRetention *time.Duration `help:"Delete call logs after this time period. 0 to disable" env:"FTL_EVENT_LOG_RETENTION" default:"24h"`
ArtefactChunkSize int `help:"Size of each chunk streamed to the client." default:"1048576"`
KMSURI *string `help:"URI for KMS key e.g. with fake-kms:// or aws-kms://arn:aws:kms:ap-southeast-2:12345:key/0000-1111" env:"FTL_KMS_URI"`
KMSURI *url.URL `help:"URI for KMS key e.g. aws-kms://arn:aws:kms:ap-southeast-2:12345:key/0000-1111" env:"FTL_KMS_URI"`
EncryptionKeys
CommonConfig
}

Expand All @@ -112,7 +150,7 @@ func (c *Config) SetDefaults() {
}

// Start the Controller. Blocks until the context is cancelled.
func Start(ctx context.Context, config Config, runnerScaling scaling.RunnerScaling, conn *sql.DB) error {
func Start(ctx context.Context, config Config, runnerScaling scaling.RunnerScaling, conn *sql.DB, encryptors *dal.Encryptors) error {
config.SetDefaults()

logger := log.FromContext(ctx)
Expand All @@ -133,7 +171,7 @@ func Start(ctx context.Context, config Config, runnerScaling scaling.RunnerScali
logger.Infof("Web console available at: %s", config.Bind)
}

svc, err := New(ctx, conn, config, runnerScaling)
svc, err := New(ctx, conn, config, runnerScaling, encryptors)
if err != nil {
return err
}
Expand Down Expand Up @@ -215,7 +253,7 @@ type Service struct {
asyncCallsLock sync.Mutex
}

func New(ctx context.Context, conn *sql.DB, config Config, runnerScaling scaling.RunnerScaling) (*Service, error) {
func New(ctx context.Context, conn *sql.DB, config Config, runnerScaling scaling.RunnerScaling, encryptors *dal.Encryptors) (*Service, error) {
key := config.Key
if config.Key.IsZero() {
key = model.NewControllerKey(config.Bind.Hostname(), config.Bind.Port())
Expand All @@ -229,7 +267,7 @@ func New(ctx context.Context, conn *sql.DB, config Config, runnerScaling scaling
config.ControllerTimeout = time.Second * 5
}

db, err := dal.New(ctx, conn, optional.Ptr[string](config.KMSURI))
db, err := dal.New(ctx, conn, encryptors)
if err != nil {
return nil, fmt.Errorf("failed to create DAL: %w", err)
}
Expand Down Expand Up @@ -1454,7 +1492,7 @@ func (s *Service) catchAsyncCall(ctx context.Context, logger *log.Logger, call *
originalResult := either.RightOf[[]byte](originalError)

request := map[string]any{
"request": json.RawMessage(call.Request),
"request": call.Request,
"error": originalError,
}
body, err := json.Marshal(request)
Expand Down
8 changes: 3 additions & 5 deletions backend/controller/cronjobs/cronjobs_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,13 @@ import (
"testing"
"time"

"github.com/alecthomas/assert/v2"
"github.com/alecthomas/types/optional"
"github.com/benbjohnson/clock"

db "github.com/TBD54566975/ftl/backend/controller/cronjobs/dal"
parentdb "github.com/TBD54566975/ftl/backend/controller/dal"
"github.com/TBD54566975/ftl/backend/controller/sql/sqltest"
in "github.com/TBD54566975/ftl/integration"
"github.com/TBD54566975/ftl/internal/log"
"github.com/alecthomas/assert/v2"
"github.com/benbjohnson/clock"
)

func TestServiceWithRealDal(t *testing.T) {
Expand All @@ -28,7 +26,7 @@ func TestServiceWithRealDal(t *testing.T) {

conn := sqltest.OpenForTesting(ctx, t)
dal := db.New(conn)
parentDAL, err := parentdb.New(ctx, conn, optional.None[string]())
parentDAL, err := parentdb.New(ctx, conn, parentdb.NoOpEncryptors())
assert.NoError(t, err)

// Using a real clock because real db queries use db clock
Expand Down
2 changes: 1 addition & 1 deletion backend/controller/cronjobs/cronjobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestServiceWithMockDal(t *testing.T) {
attemptCountMap: map[string]int{},
}
conn := sqltest.OpenForTesting(ctx, t)
parentDAL, err := db.New(ctx, conn, optional.None[string]())
parentDAL, err := db.New(ctx, conn, db.NoOpEncryptors())
assert.NoError(t, err)

testServiceWithDal(ctx, t, mockDal, parentDAL, clk)
Expand Down
2 changes: 1 addition & 1 deletion backend/controller/cronjobs/sql/db.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 3 additions & 9 deletions backend/controller/cronjobs/sql/models.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion backend/controller/cronjobs/sql/querier.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion backend/controller/cronjobs/sql/queries.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 6 additions & 13 deletions backend/controller/dal/async_calls.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package dal

import (
"context"
"encoding/json"
"errors"
"fmt"
"time"
Expand All @@ -14,7 +15,6 @@ import (
"github.com/TBD54566975/ftl/backend/controller/sql/sqltypes"
dalerrs "github.com/TBD54566975/ftl/backend/dal"
"github.com/TBD54566975/ftl/backend/schema"
"github.com/TBD54566975/ftl/internal/encryption"
)

type asyncOriginParseRoot struct {
Expand Down Expand Up @@ -77,7 +77,7 @@ type AsyncCall struct {
Origin AsyncOrigin
Verb schema.RefKey
CatchVerb optional.Option[schema.RefKey]
Request []byte
Request json.RawMessage
ScheduledAt time.Time
QueueDepth int64
ParentRequestKey optional.Option[string]
Expand Down Expand Up @@ -115,7 +115,8 @@ func (d *DAL) AcquireAsyncCall(ctx context.Context) (call *AsyncCall, err error)
return nil, fmt.Errorf("failed to parse origin key %q: %w", row.Origin, err)
}

decryptedRequest, err := d.decrypt(encryption.AsyncSubKey, row.Request)
var decryptedRequest json.RawMessage
err = d.encryptors.Async.DecryptJSON(row.Request, &decryptedRequest)
if err != nil {
return nil, fmt.Errorf("failed to decrypt async call request: %w", err)
}
Expand Down Expand Up @@ -158,11 +159,7 @@ func (d *DAL) CompleteAsyncCall(ctx context.Context,
didScheduleAnotherCall = false
switch result := result.(type) {
case either.Left[[]byte, string]: // Successful response.
encryptedResult, err := d.encrypt(encryption.AsyncSubKey, result.Get())
if err != nil {
return false, fmt.Errorf("failed to encrypt async call result: %w", err)
}
_, err = tx.db.SucceedAsyncCall(ctx, encryptedResult, call.ID)
_, err = tx.db.SucceedAsyncCall(ctx, result.Get(), call.ID)
if err != nil {
return false, dalerrs.TranslatePGError(err) //nolint:wrapcheck
}
Expand Down Expand Up @@ -227,14 +224,10 @@ func (d *DAL) LoadAsyncCall(ctx context.Context, id int64) (*AsyncCall, error) {
if err != nil {
return nil, fmt.Errorf("failed to parse origin key %q: %w", row.Origin, err)
}
request, err := d.decrypt(encryption.AsyncSubKey, row.Request)
if err != nil {
return nil, fmt.Errorf("failed to decrypt async call request: %w", err)
}
return &AsyncCall{
ID: row.ID,
Verb: row.Verb,
Origin: origin,
Request: request,
Request: row.Request,
}, nil
}
4 changes: 1 addition & 3 deletions backend/controller/dal/async_calls_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"context"
"testing"

"github.com/alecthomas/types/optional"

"github.com/TBD54566975/ftl/backend/controller/sql/sqltest"
dalerrs "github.com/TBD54566975/ftl/backend/dal"
"github.com/TBD54566975/ftl/internal/log"
Expand All @@ -15,7 +13,7 @@ import (
func TestNoCallToAcquire(t *testing.T) {
ctx := log.ContextWithNewDefaultLogger(context.Background())
conn := sqltest.OpenForTesting(ctx, t)
dal, err := New(ctx, conn, optional.None[string]())
dal, err := New(ctx, conn, NoOpEncryptors())
assert.NoError(t, err)

_, err = dal.AcquireAsyncCall(ctx)
Expand Down
44 changes: 24 additions & 20 deletions backend/controller/dal/dal.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,30 +210,35 @@ func WithReservation(ctx context.Context, reservation Reservation, fn func() err
return reservation.Commit(ctx)
}

func New(ctx context.Context, conn *stdsql.DB, kmsURL optional.Option[string]) (*DAL, error) {
d := &DAL{
func New(ctx context.Context, conn *stdsql.DB, encryptors *Encryptors) (*DAL, error) {
return &DAL{
db: sql.NewDB(conn),
DeploymentChanges: pubsub.New[DeploymentNotification](),
kmsURL: kmsURL,
}

if err := d.setupEncryptor(ctx); err != nil {
return nil, fmt.Errorf("failed to setup encryptor: %w", err)
}

return d, nil
encryptors: encryptors,
}, nil
}

type DAL struct {
db sql.DBI

kmsURL optional.Option[string]
encryptor encryption.DataEncryptor
db sql.DBI
encryptors *Encryptors

// DeploymentChanges is a Topic that receives changes to the deployments table.
DeploymentChanges *pubsub.Topic[DeploymentNotification]
}

type Encryptors struct {
Logs encryption.Encryptable
Async encryption.Encryptable
}

// NoOpEncryptors do not encrypt potentially sensitive data.
func NoOpEncryptors() *Encryptors {
return &Encryptors{
Logs: encryption.NoOpEncryptor{},
Async: encryption.NoOpEncryptor{},
}
}

// Tx is DAL within a transaction.
type Tx struct {
*DAL
Expand Down Expand Up @@ -280,8 +285,7 @@ func (d *DAL) Begin(ctx context.Context) (*Tx, error) {
return &Tx{&DAL{
db: tx,
DeploymentChanges: d.DeploymentChanges,
kmsURL: d.kmsURL,
encryptor: d.encryptor,
encryptors: d.encryptors,
}}, nil
}

Expand Down Expand Up @@ -709,7 +713,7 @@ func (d *DAL) SetDeploymentReplicas(ctx context.Context, key model.DeploymentKey
return dalerrs.TranslatePGError(err)
}
}
payload, err := d.encryptJSON(encryption.LogsSubKey, map[string]interface{}{
payload, err := d.encryptors.Logs.EncryptJSON(map[string]any{
"prev_min_replicas": deployment.MinReplicas,
"min_replicas": minReplicas,
})
Expand Down Expand Up @@ -782,7 +786,7 @@ func (d *DAL) ReplaceDeployment(ctx context.Context, newDeploymentKey model.Depl
}
}

payload, err := d.encryptJSON(encryption.LogsSubKey, map[string]any{
payload, err := d.encryptors.Logs.EncryptJSON(map[string]any{
"min_replicas": int32(minReplicas),
"replaced": replacedDeploymentKey,
})
Expand Down Expand Up @@ -1057,7 +1061,7 @@ func (d *DAL) InsertLogEvent(ctx context.Context, log *LogEvent) error {
"error": log.Error,
"stack": log.Stack,
}
encryptedPayload, err := d.encryptJSON(encryption.LogsSubKey, payload)
encryptedPayload, err := d.encryptors.Logs.EncryptJSON(payload)
if err != nil {
return fmt.Errorf("failed to encrypt log payload: %w", err)
}
Expand Down Expand Up @@ -1137,7 +1141,7 @@ func (d *DAL) InsertCallEvent(ctx context.Context, call *CallEvent) error {
if pr, ok := call.ParentRequestKey.Get(); ok {
parentRequestKey = optional.Some(pr.String())
}
payload, err := d.encryptJSON(encryption.LogsSubKey, map[string]any{
payload, err := d.encryptors.Logs.EncryptJSON(map[string]any{
"duration_ms": call.Duration.Milliseconds(),
"request": call.Request,
"response": call.Response,
Expand Down
Loading