Skip to content

Commit

Permalink
Revert "feat(KMS): derive keys for logs and async (#2338)"
Browse files Browse the repository at this point in the history
This reverts commit 2a3edbc.
  • Loading branch information
matt2e committed Aug 14, 2024
1 parent 2a3edbc commit 0607e55
Show file tree
Hide file tree
Showing 29 changed files with 416 additions and 300 deletions.
50 changes: 44 additions & 6 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import (
cf "github.com/TBD54566975/ftl/common/configuration"
frontend "github.com/TBD54566975/ftl/frontend"
"github.com/TBD54566975/ftl/internal/cors"
"github.com/TBD54566975/ftl/internal/encryption"
ftlhttp "github.com/TBD54566975/ftl/internal/http"
"github.com/TBD54566975/ftl/internal/log"
ftlmaps "github.com/TBD54566975/ftl/internal/maps"
Expand Down Expand Up @@ -84,6 +85,42 @@ func (c *CommonConfig) Validate() error {
return nil
}

// EncryptionKeys for the controller config.
// Deprecated: Will remove this at some stage.
type EncryptionKeys struct {
Logs string `name:"log-key" help:"Key for sensitive log data in internal FTL tables." env:"FTL_LOG_ENCRYPTION_KEY"`
Async string `name:"async-key" help:"Key for sensitive async call data in internal FTL tables." env:"FTL_ASYNC_ENCRYPTION_KEY"`
}

func (e EncryptionKeys) Encryptors(required bool) (*dal.Encryptors, error) {
encryptors := dal.Encryptors{}
if e.Logs != "" {
enc, err := encryption.NewForKeyOrURI(e.Logs)
if err != nil {
return nil, fmt.Errorf("could not create log encryptor: %w", err)
}
encryptors.Logs = enc
} else if required {
return nil, fmt.Errorf("FTL_LOG_ENCRYPTION_KEY is required")
} else {
encryptors.Logs = encryption.NoOpEncryptor{}
}

if e.Async != "" {
enc, err := encryption.NewForKeyOrURI(e.Async)
if err != nil {
return nil, fmt.Errorf("could not create async calls encryptor: %w", err)
}
encryptors.Async = enc
} else if required {
return nil, fmt.Errorf("FTL_ASYNC_ENCRYPTION_KEY is required")
} else {
encryptors.Async = encryption.NoOpEncryptor{}
}

return &encryptors, nil
}

type Config struct {
Bind *url.URL `help:"Socket to bind to." default:"http://127.0.0.1:8892" env:"FTL_CONTROLLER_BIND"`
IngressBind *url.URL `help:"Socket to bind to for ingress." default:"http://127.0.0.1:8891" env:"FTL_CONTROLLER_INGRESS_BIND"`
Expand All @@ -98,7 +135,8 @@ type Config struct {
ModuleUpdateFrequency time.Duration `help:"Frequency to send module updates." default:"30s"`
EventLogRetention *time.Duration `help:"Delete call logs after this time period. 0 to disable" env:"FTL_EVENT_LOG_RETENTION" default:"24h"`
ArtefactChunkSize int `help:"Size of each chunk streamed to the client." default:"1048576"`
KMSURI *string `help:"URI for KMS key e.g. with fake-kms:// or aws-kms://arn:aws:kms:ap-southeast-2:12345:key/0000-1111" env:"FTL_KMS_URI"`
KMSURI *url.URL `help:"URI for KMS key e.g. aws-kms://arn:aws:kms:ap-southeast-2:12345:key/0000-1111" env:"FTL_KMS_URI"`
EncryptionKeys
CommonConfig
}

Expand All @@ -112,7 +150,7 @@ func (c *Config) SetDefaults() {
}

// Start the Controller. Blocks until the context is cancelled.
func Start(ctx context.Context, config Config, runnerScaling scaling.RunnerScaling, conn *sql.DB) error {
func Start(ctx context.Context, config Config, runnerScaling scaling.RunnerScaling, conn *sql.DB, encryptors *dal.Encryptors) error {
config.SetDefaults()

logger := log.FromContext(ctx)
Expand All @@ -133,7 +171,7 @@ func Start(ctx context.Context, config Config, runnerScaling scaling.RunnerScali
logger.Infof("Web console available at: %s", config.Bind)
}

svc, err := New(ctx, conn, config, runnerScaling)
svc, err := New(ctx, conn, config, runnerScaling, encryptors)
if err != nil {
return err
}
Expand Down Expand Up @@ -215,7 +253,7 @@ type Service struct {
asyncCallsLock sync.Mutex
}

func New(ctx context.Context, conn *sql.DB, config Config, runnerScaling scaling.RunnerScaling) (*Service, error) {
func New(ctx context.Context, conn *sql.DB, config Config, runnerScaling scaling.RunnerScaling, encryptors *dal.Encryptors) (*Service, error) {
key := config.Key
if config.Key.IsZero() {
key = model.NewControllerKey(config.Bind.Hostname(), config.Bind.Port())
Expand All @@ -229,7 +267,7 @@ func New(ctx context.Context, conn *sql.DB, config Config, runnerScaling scaling
config.ControllerTimeout = time.Second * 5
}

db, err := dal.New(ctx, conn, optional.Ptr[string](config.KMSURI))
db, err := dal.New(ctx, conn, encryptors)
if err != nil {
return nil, fmt.Errorf("failed to create DAL: %w", err)
}
Expand Down Expand Up @@ -1454,7 +1492,7 @@ func (s *Service) catchAsyncCall(ctx context.Context, logger *log.Logger, call *
originalResult := either.RightOf[[]byte](originalError)

request := map[string]any{
"request": json.RawMessage(call.Request),
"request": call.Request,
"error": originalError,
}
body, err := json.Marshal(request)
Expand Down
8 changes: 3 additions & 5 deletions backend/controller/cronjobs/cronjobs_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,13 @@ import (
"testing"
"time"

"github.com/alecthomas/assert/v2"
"github.com/alecthomas/types/optional"
"github.com/benbjohnson/clock"

db "github.com/TBD54566975/ftl/backend/controller/cronjobs/dal"
parentdb "github.com/TBD54566975/ftl/backend/controller/dal"
"github.com/TBD54566975/ftl/backend/controller/sql/sqltest"
in "github.com/TBD54566975/ftl/integration"
"github.com/TBD54566975/ftl/internal/log"
"github.com/alecthomas/assert/v2"
"github.com/benbjohnson/clock"
)

func TestServiceWithRealDal(t *testing.T) {
Expand All @@ -28,7 +26,7 @@ func TestServiceWithRealDal(t *testing.T) {

conn := sqltest.OpenForTesting(ctx, t)
dal := db.New(conn)
parentDAL, err := parentdb.New(ctx, conn, optional.None[string]())
parentDAL, err := parentdb.New(ctx, conn, parentdb.NoOpEncryptors())
assert.NoError(t, err)

// Using a real clock because real db queries use db clock
Expand Down
2 changes: 1 addition & 1 deletion backend/controller/cronjobs/cronjobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestServiceWithMockDal(t *testing.T) {
attemptCountMap: map[string]int{},
}
conn := sqltest.OpenForTesting(ctx, t)
parentDAL, err := db.New(ctx, conn, optional.None[string]())
parentDAL, err := db.New(ctx, conn, db.NoOpEncryptors())
assert.NoError(t, err)

testServiceWithDal(ctx, t, mockDal, parentDAL, clk)
Expand Down
10 changes: 2 additions & 8 deletions backend/controller/cronjobs/sql/models.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 6 additions & 13 deletions backend/controller/dal/async_calls.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package dal

import (
"context"
"encoding/json"
"errors"
"fmt"
"time"
Expand All @@ -14,7 +15,6 @@ import (
"github.com/TBD54566975/ftl/backend/controller/sql/sqltypes"
dalerrs "github.com/TBD54566975/ftl/backend/dal"
"github.com/TBD54566975/ftl/backend/schema"
"github.com/TBD54566975/ftl/internal/encryption"
)

type asyncOriginParseRoot struct {
Expand Down Expand Up @@ -77,7 +77,7 @@ type AsyncCall struct {
Origin AsyncOrigin
Verb schema.RefKey
CatchVerb optional.Option[schema.RefKey]
Request []byte
Request json.RawMessage
ScheduledAt time.Time
QueueDepth int64
ParentRequestKey optional.Option[string]
Expand Down Expand Up @@ -115,7 +115,8 @@ func (d *DAL) AcquireAsyncCall(ctx context.Context) (call *AsyncCall, err error)
return nil, fmt.Errorf("failed to parse origin key %q: %w", row.Origin, err)
}

decryptedRequest, err := d.decrypt(encryption.AsyncSubKey, row.Request)
var decryptedRequest json.RawMessage
err = d.encryptors.Async.DecryptJSON(row.Request, &decryptedRequest)
if err != nil {
return nil, fmt.Errorf("failed to decrypt async call request: %w", err)
}
Expand Down Expand Up @@ -158,11 +159,7 @@ func (d *DAL) CompleteAsyncCall(ctx context.Context,
didScheduleAnotherCall = false
switch result := result.(type) {
case either.Left[[]byte, string]: // Successful response.
encryptedResult, err := d.encrypt(encryption.AsyncSubKey, result.Get())
if err != nil {
return false, fmt.Errorf("failed to encrypt async call result: %w", err)
}
_, err = tx.db.SucceedAsyncCall(ctx, encryptedResult, call.ID)
_, err = tx.db.SucceedAsyncCall(ctx, result.Get(), call.ID)
if err != nil {
return false, dalerrs.TranslatePGError(err) //nolint:wrapcheck
}
Expand Down Expand Up @@ -227,14 +224,10 @@ func (d *DAL) LoadAsyncCall(ctx context.Context, id int64) (*AsyncCall, error) {
if err != nil {
return nil, fmt.Errorf("failed to parse origin key %q: %w", row.Origin, err)
}
request, err := d.decrypt(encryption.AsyncSubKey, row.Request)
if err != nil {
return nil, fmt.Errorf("failed to decrypt async call request: %w", err)
}
return &AsyncCall{
ID: row.ID,
Verb: row.Verb,
Origin: origin,
Request: request,
Request: row.Request,
}, nil
}
4 changes: 1 addition & 3 deletions backend/controller/dal/async_calls_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"context"
"testing"

"github.com/alecthomas/types/optional"

"github.com/TBD54566975/ftl/backend/controller/sql/sqltest"
dalerrs "github.com/TBD54566975/ftl/backend/dal"
"github.com/TBD54566975/ftl/internal/log"
Expand All @@ -15,7 +13,7 @@ import (
func TestNoCallToAcquire(t *testing.T) {
ctx := log.ContextWithNewDefaultLogger(context.Background())
conn := sqltest.OpenForTesting(ctx, t)
dal, err := New(ctx, conn, optional.None[string]())
dal, err := New(ctx, conn, NoOpEncryptors())
assert.NoError(t, err)

_, err = dal.AcquireAsyncCall(ctx)
Expand Down
44 changes: 24 additions & 20 deletions backend/controller/dal/dal.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,30 +210,35 @@ func WithReservation(ctx context.Context, reservation Reservation, fn func() err
return reservation.Commit(ctx)
}

func New(ctx context.Context, conn *stdsql.DB, kmsURL optional.Option[string]) (*DAL, error) {
d := &DAL{
func New(ctx context.Context, conn *stdsql.DB, encryptors *Encryptors) (*DAL, error) {
return &DAL{
db: sql.NewDB(conn),
DeploymentChanges: pubsub.New[DeploymentNotification](),
kmsURL: kmsURL,
}

if err := d.setupEncryptor(ctx); err != nil {
return nil, fmt.Errorf("failed to setup encryptor: %w", err)
}

return d, nil
encryptors: encryptors,
}, nil
}

type DAL struct {
db sql.DBI

kmsURL optional.Option[string]
encryptor encryption.DataEncryptor
db sql.DBI
encryptors *Encryptors

// DeploymentChanges is a Topic that receives changes to the deployments table.
DeploymentChanges *pubsub.Topic[DeploymentNotification]
}

type Encryptors struct {
Logs encryption.Encryptable
Async encryption.Encryptable
}

// NoOpEncryptors do not encrypt potentially sensitive data.
func NoOpEncryptors() *Encryptors {
return &Encryptors{
Logs: encryption.NoOpEncryptor{},
Async: encryption.NoOpEncryptor{},
}
}

// Tx is DAL within a transaction.
type Tx struct {
*DAL
Expand Down Expand Up @@ -280,8 +285,7 @@ func (d *DAL) Begin(ctx context.Context) (*Tx, error) {
return &Tx{&DAL{
db: tx,
DeploymentChanges: d.DeploymentChanges,
kmsURL: d.kmsURL,
encryptor: d.encryptor,
encryptors: d.encryptors,
}}, nil
}

Expand Down Expand Up @@ -709,7 +713,7 @@ func (d *DAL) SetDeploymentReplicas(ctx context.Context, key model.DeploymentKey
return dalerrs.TranslatePGError(err)
}
}
payload, err := d.encryptJSON(encryption.LogsSubKey, map[string]interface{}{
payload, err := d.encryptors.Logs.EncryptJSON(map[string]any{
"prev_min_replicas": deployment.MinReplicas,
"min_replicas": minReplicas,
})
Expand Down Expand Up @@ -782,7 +786,7 @@ func (d *DAL) ReplaceDeployment(ctx context.Context, newDeploymentKey model.Depl
}
}

payload, err := d.encryptJSON(encryption.LogsSubKey, map[string]any{
payload, err := d.encryptors.Logs.EncryptJSON(map[string]any{
"min_replicas": int32(minReplicas),
"replaced": replacedDeploymentKey,
})
Expand Down Expand Up @@ -1057,7 +1061,7 @@ func (d *DAL) InsertLogEvent(ctx context.Context, log *LogEvent) error {
"error": log.Error,
"stack": log.Stack,
}
encryptedPayload, err := d.encryptJSON(encryption.LogsSubKey, payload)
encryptedPayload, err := d.encryptors.Logs.EncryptJSON(payload)
if err != nil {
return fmt.Errorf("failed to encrypt log payload: %w", err)
}
Expand Down Expand Up @@ -1137,7 +1141,7 @@ func (d *DAL) InsertCallEvent(ctx context.Context, call *CallEvent) error {
if pr, ok := call.ParentRequestKey.Get(); ok {
parentRequestKey = optional.Some(pr.String())
}
payload, err := d.encryptJSON(encryption.LogsSubKey, map[string]any{
payload, err := d.encryptors.Logs.EncryptJSON(map[string]any{
"duration_ms": call.Duration.Milliseconds(),
"request": call.Request,
"response": call.Response,
Expand Down
7 changes: 3 additions & 4 deletions backend/controller/dal/dal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
func TestDAL(t *testing.T) {
ctx := log.ContextWithNewDefaultLogger(context.Background())
conn := sqltest.OpenForTesting(ctx, t)
dal, err := New(ctx, conn, optional.None[string]())
dal, err := New(ctx, conn, NoOpEncryptors())
assert.NoError(t, err)
assert.NotZero(t, dal)
var testContent = bytes.Repeat([]byte("sometestcontentthatislongerthanthereadbuffer"), 100)
Expand Down Expand Up @@ -235,7 +235,7 @@ func TestDAL(t *testing.T) {
DeploymentKey: deploymentKey,
RequestKey: optional.Some(requestKey),
Request: []byte("{}"),
Response: []byte(`{"time":"now"}`),
Response: []byte(`{"time": "now"}`),
DestVerb: schema.Ref{Module: "time", Name: "time"},
}
t.Run("InsertCallEvent", func(t *testing.T) {
Expand Down Expand Up @@ -396,7 +396,6 @@ func normaliseEvents(events []Event) []Event {
f.Set(reflect.Zero(f.Type()))
events[i] = event
}

return events
}

Expand All @@ -408,7 +407,7 @@ func assertEventsEqual(t *testing.T, expected, actual []Event) {
func TestDeleteOldEvents(t *testing.T) {
ctx := log.ContextWithNewDefaultLogger(context.Background())
conn := sqltest.OpenForTesting(ctx, t)
dal, err := New(ctx, conn, optional.None[string]())
dal, err := New(ctx, conn, NoOpEncryptors())
assert.NoError(t, err)

var testContent = bytes.Repeat([]byte("sometestcontentthatislongerthanthereadbuffer"), 100)
Expand Down
Loading

0 comments on commit 0607e55

Please sign in to comment.