From 8e72149df37c2658dc20f001a075638e709cbe01 Mon Sep 17 00:00:00 2001 From: Wes Date: Thu, 12 Sep 2024 20:39:52 -0700 Subject: [PATCH] chore(refactor): extract encryption dal for sharing with other packages (#2665) Here's a rough start at this. I removed the tests, which we def don't want, but might need some eye/context on how we want to test this version (if we even want it) --------- Co-authored-by: github-actions[bot] --- backend/controller/controller.go | 10 +- backend/controller/cronjobs/cronjobs_test.go | 8 +- backend/controller/dal/async_calls.go | 10 +- backend/controller/dal/async_calls_test.go | 7 +- backend/controller/dal/dal.go | 46 +- backend/controller/dal/dal_test.go | 93 +-- backend/controller/dal/events.go | 8 +- backend/controller/dal/fsm.go | 6 +- backend/controller/dal/fsm_test.go | 7 +- .../controller/dal/internal/sql/querier.go | 3 - .../controller/dal/internal/sql/queries.sql | 17 +- .../dal/internal/sql/queries.sql.go | 41 -- backend/controller/dal/pubsub.go | 2 +- .../encryption.go => encryption/dal/dal.go} | 51 +- .../encryption/dal/internal/sql/db.go | 31 + .../encryption/dal/internal/sql/models.go | 537 ++++++++++++++++++ .../encryption/dal/internal/sql/querier.go | 19 + .../encryption/dal/internal/sql/queries.sql | 14 + .../dal/internal/sql/queries.sql.go | 53 ++ backend/controller/encryption/encryption.go | 72 +++ .../controller/encryption/encryption_test.go | 56 ++ sqlc.yaml | 7 + 22 files changed, 870 insertions(+), 228 deletions(-) rename backend/controller/{dal/encryption.go => encryption/dal/dal.go} (74%) create mode 100644 backend/controller/encryption/dal/internal/sql/db.go create mode 100644 backend/controller/encryption/dal/internal/sql/models.go create mode 100644 backend/controller/encryption/dal/internal/sql/querier.go create mode 100644 backend/controller/encryption/dal/internal/sql/queries.sql create mode 100644 backend/controller/encryption/dal/internal/sql/queries.sql.go create mode 100644 backend/controller/encryption/encryption.go create mode 100644 backend/controller/encryption/encryption_test.go diff --git a/backend/controller/controller.go b/backend/controller/controller.go index 96bec9e84..1dd9dbcce 100644 --- a/backend/controller/controller.go +++ b/backend/controller/controller.go @@ -38,6 +38,7 @@ import ( "github.com/TBD54566975/ftl/backend/controller/console" "github.com/TBD54566975/ftl/backend/controller/cronjobs" "github.com/TBD54566975/ftl/backend/controller/dal" + "github.com/TBD54566975/ftl/backend/controller/encryption" "github.com/TBD54566975/ftl/backend/controller/ingress" "github.com/TBD54566975/ftl/backend/controller/leases" leasesdal "github.com/TBD54566975/ftl/backend/controller/leases/dal" @@ -54,7 +55,7 @@ import ( frontend "github.com/TBD54566975/ftl/frontend/console" cf "github.com/TBD54566975/ftl/internal/configuration/manager" "github.com/TBD54566975/ftl/internal/cors" - "github.com/TBD54566975/ftl/internal/encryption" + ftlencryption "github.com/TBD54566975/ftl/internal/encryption" ftlhttp "github.com/TBD54566975/ftl/internal/http" "github.com/TBD54566975/ftl/internal/log" ftlmaps "github.com/TBD54566975/ftl/internal/maps" @@ -231,12 +232,13 @@ func New(ctx context.Context, conn *sql.DB, config Config, devel bool) (*Service config.ControllerTimeout = time.Second * 5 } - ldb := leasesdal.New(conn) - db, err := dal.New(ctx, conn, encryption.NewBuilder().WithKMSURI(optional.Ptr(config.KMSURI))) + encryptionSrv, err := encryption.New(ctx, conn, ftlencryption.NewBuilder().WithKMSURI(optional.Ptr(config.KMSURI))) if err != nil { - return nil, fmt.Errorf("failed to create DAL: %w", err) + return nil, fmt.Errorf("failed to create encryption dal: %w", err) } + db := dal.New(ctx, conn, encryptionSrv) + ldb := leasesdal.New(conn) svc := &Service{ tasks: scheduledtask.New(ctx, key, ldb), dal: db, diff --git a/backend/controller/cronjobs/cronjobs_test.go b/backend/controller/cronjobs/cronjobs_test.go index 9d65dd572..ea5dce74a 100644 --- a/backend/controller/cronjobs/cronjobs_test.go +++ b/backend/controller/cronjobs/cronjobs_test.go @@ -13,11 +13,12 @@ import ( "github.com/TBD54566975/ftl/backend/controller/cronjobs/dal" parentdal "github.com/TBD54566975/ftl/backend/controller/dal" + "github.com/TBD54566975/ftl/backend/controller/encryption" "github.com/TBD54566975/ftl/backend/controller/sql/sqltest" "github.com/TBD54566975/ftl/backend/libdal" "github.com/TBD54566975/ftl/backend/schema" "github.com/TBD54566975/ftl/internal/cron" - "github.com/TBD54566975/ftl/internal/encryption" + ftlencryption "github.com/TBD54566975/ftl/internal/encryption" "github.com/TBD54566975/ftl/internal/log" "github.com/TBD54566975/ftl/internal/model" ) @@ -34,8 +35,11 @@ func TestNewCronJobsForModule(t *testing.T) { key := model.NewControllerKey("localhost", strconv.Itoa(8080+1)) conn := sqltest.OpenForTesting(ctx, t) dal := dal.New(conn) - parentDAL, err := parentdal.New(ctx, conn, encryption.NewBuilder()) + + encryption, err := encryption.New(ctx, conn, ftlencryption.NewBuilder()) assert.NoError(t, err) + + parentDAL := parentdal.New(ctx, conn, encryption) moduleName := "initial" jobsToCreate := newCronJobs(t, moduleName, "* * * * * *", clk, 2) // every minute diff --git a/backend/controller/dal/async_calls.go b/backend/controller/dal/async_calls.go index 909e03713..7ba0f4dbe 100644 --- a/backend/controller/dal/async_calls.go +++ b/backend/controller/dal/async_calls.go @@ -139,12 +139,12 @@ func (d *DAL) AcquireAsyncCall(ctx context.Context) (call *AsyncCall, leaseCtx c return nil, ctx, fmt.Errorf("failed to parse origin key %q: %w", row.Origin, err) } - decryptedRequest, err := d.decrypt(&row.Request) + decryptedRequest, err := d.encryption.Decrypt(&row.Request) if err != nil { return nil, ctx, fmt.Errorf("failed to decrypt async call request: %w", err) } - lease, leaseCtx := d.leasedal.NewLease(ctx, row.LeaseKey, row.LeaseIdempotencyKey, ttl) + lease, leaseCtx := d.leaseDAL.NewLease(ctx, row.LeaseKey, row.LeaseIdempotencyKey, ttl) return &AsyncCall{ ID: row.AsyncCallID, Verb: row.Verb, @@ -192,7 +192,7 @@ func (d *DAL) CompleteAsyncCall(ctx context.Context, switch result := result.(type) { case either.Left[[]byte, string]: // Successful response. var encryptedResult encryption.EncryptedAsyncColumn - err := tx.encrypt(result.Get(), &encryptedResult) + err := tx.encryption.Encrypt(result.Get(), &encryptedResult) if err != nil { return false, fmt.Errorf("failed to encrypt async call result: %w", err) } @@ -261,7 +261,7 @@ func (d *DAL) LoadAsyncCall(ctx context.Context, id int64) (*AsyncCall, error) { if err != nil { return nil, fmt.Errorf("failed to parse origin key %q: %w", row.Origin, err) } - request, err := d.decrypt(&row.Request) + request, err := d.encryption.Decrypt(&row.Request) if err != nil { return nil, fmt.Errorf("failed to decrypt async call request: %w", err) } @@ -284,7 +284,7 @@ func (d *DAL) GetZombieAsyncCalls(ctx context.Context, limit int) ([]*AsyncCall, if err != nil { return nil, fmt.Errorf("failed to parse origin key %q: %w", row.Origin, err) } - decryptedRequest, err := d.decrypt(&row.Request) + decryptedRequest, err := d.encryption.Decrypt(&row.Request) if err != nil { return nil, fmt.Errorf("failed to decrypt async call request: %w", err) } diff --git a/backend/controller/dal/async_calls_test.go b/backend/controller/dal/async_calls_test.go index 3904fb209..7cf5b43c6 100644 --- a/backend/controller/dal/async_calls_test.go +++ b/backend/controller/dal/async_calls_test.go @@ -6,10 +6,11 @@ import ( "github.com/alecthomas/assert/v2" + "github.com/TBD54566975/ftl/backend/controller/encryption" "github.com/TBD54566975/ftl/backend/controller/sql/sqltest" "github.com/TBD54566975/ftl/backend/libdal" "github.com/TBD54566975/ftl/backend/schema" - "github.com/TBD54566975/ftl/internal/encryption" + ftlencryption "github.com/TBD54566975/ftl/internal/encryption" "github.com/TBD54566975/ftl/internal/log" "github.com/TBD54566975/ftl/internal/model" ) @@ -17,9 +18,11 @@ import ( func TestNoCallToAcquire(t *testing.T) { ctx := log.ContextWithNewDefaultLogger(context.Background()) conn := sqltest.OpenForTesting(ctx, t) - dal, err := New(ctx, conn, encryption.NewBuilder()) + encryption, err := encryption.New(ctx, conn, ftlencryption.NewBuilder()) assert.NoError(t, err) + dal := New(ctx, conn, encryption) + _, _, err = dal.AcquireAsyncCall(ctx) assert.IsError(t, err, libdal.ErrNotFound) assert.EqualError(t, err, "no pending async calls: not found") diff --git a/backend/controller/dal/dal.go b/backend/controller/dal/dal.go index ca20a7c46..8a1898cdd 100644 --- a/backend/controller/dal/dal.go +++ b/backend/controller/dal/dal.go @@ -16,12 +16,13 @@ import ( "google.golang.org/protobuf/proto" dalsql "github.com/TBD54566975/ftl/backend/controller/dal/internal/sql" + "github.com/TBD54566975/ftl/backend/controller/encryption" leasedal "github.com/TBD54566975/ftl/backend/controller/leases/dal" "github.com/TBD54566975/ftl/backend/controller/sql/sqltypes" "github.com/TBD54566975/ftl/backend/libdal" ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1" "github.com/TBD54566975/ftl/backend/schema" - "github.com/TBD54566975/ftl/internal/encryption" + ftlencryption "github.com/TBD54566975/ftl/internal/encryption" "github.com/TBD54566975/ftl/internal/log" "github.com/TBD54566975/ftl/internal/maps" "github.com/TBD54566975/ftl/internal/model" @@ -201,40 +202,33 @@ func WithReservation(ctx context.Context, reservation Reservation, fn func() err return reservation.Commit(ctx) } -func New(ctx context.Context, conn libdal.Connection, encryptionBuilder encryption.Builder) (*DAL, error) { +func New(ctx context.Context, conn libdal.Connection, encryption *encryption.Service) *DAL { var d *DAL d = &DAL{ - leasedal: leasedal.New(conn), - db: dalsql.New(conn), + leaseDAL: leasedal.New(conn), + db: dalsql.New(conn), + encryption: encryption, Handle: libdal.New(conn, func(h *libdal.Handle[DAL]) *DAL { return &DAL{ Handle: h, db: dalsql.New(h.Connection), - leasedal: leasedal.New(h.Connection), - encryptor: d.encryptor, + leaseDAL: leasedal.New(h.Connection), + encryption: d.encryption, DeploymentChanges: d.DeploymentChanges, } }), DeploymentChanges: pubsub.New[DeploymentNotification](), } - encryptor, err := encryptionBuilder.Build(ctx, d) - if err != nil { - return nil, fmt.Errorf("build encryptor: %w", err) - } - if err := d.verifyEncryptor(ctx, encryptor); err != nil { - return nil, fmt.Errorf("verify encryptor: %w", err) - } - d.encryptor = encryptor - return d, nil + + return d } type DAL struct { *libdal.Handle[DAL] db dalsql.Querier - leasedal *leasedal.DAL - - encryptor encryption.DataEncryptor + leaseDAL *leasedal.DAL + encryption *encryption.Service // DeploymentChanges is a Topic that receives changes to the deployments table. DeploymentChanges *pubsub.Topic[DeploymentNotification] @@ -611,8 +605,8 @@ func (d *DAL) SetDeploymentReplicas(ctx context.Context, key model.DeploymentKey return libdal.TranslatePGError(err) } } - var payload encryption.EncryptedTimelineColumn - err = d.encryptJSON(map[string]interface{}{ + var payload ftlencryption.EncryptedTimelineColumn + err = d.encryption.EncryptJSON(map[string]interface{}{ "prev_min_replicas": deployment.MinReplicas, "min_replicas": minReplicas, }, &payload) @@ -685,8 +679,8 @@ func (d *DAL) ReplaceDeployment(ctx context.Context, newDeploymentKey model.Depl } } - var payload encryption.EncryptedTimelineColumn - err = d.encryptJSON(map[string]any{ + var payload ftlencryption.EncryptedTimelineColumn + err = d.encryption.EncryptJSON(map[string]any{ "min_replicas": int32(minReplicas), "replaced": replacedDeploymentKey, }, &payload) @@ -898,8 +892,8 @@ func (d *DAL) InsertLogEvent(ctx context.Context, log *LogEvent) error { "error": log.Error, "stack": log.Stack, } - var encryptedPayload encryption.EncryptedTimelineColumn - err := d.encryptJSON(payload, &encryptedPayload) + var encryptedPayload ftlencryption.EncryptedTimelineColumn + err := d.encryption.EncryptJSON(payload, &encryptedPayload) if err != nil { return fmt.Errorf("failed to encrypt log payload: %w", err) } @@ -979,8 +973,8 @@ func (d *DAL) InsertCallEvent(ctx context.Context, call *CallEvent) error { if pr, ok := call.ParentRequestKey.Get(); ok { parentRequestKey = optional.Some(pr.String()) } - var payload encryption.EncryptedTimelineColumn - err := d.encryptJSON(map[string]any{ + var payload ftlencryption.EncryptedTimelineColumn + err := d.encryption.EncryptJSON(map[string]any{ "duration_ms": call.Duration.Milliseconds(), "request": call.Request, "response": call.Response, diff --git a/backend/controller/dal/dal_test.go b/backend/controller/dal/dal_test.go index ffe5255a2..56462c5ac 100644 --- a/backend/controller/dal/dal_test.go +++ b/backend/controller/dal/dal_test.go @@ -13,11 +13,12 @@ import ( "github.com/alecthomas/types/optional" "golang.org/x/sync/errgroup" + "github.com/TBD54566975/ftl/backend/controller/encryption" "github.com/TBD54566975/ftl/backend/controller/sql/sqltest" "github.com/TBD54566975/ftl/backend/libdal" ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1" "github.com/TBD54566975/ftl/backend/schema" - "github.com/TBD54566975/ftl/internal/encryption" + ftlencryption "github.com/TBD54566975/ftl/internal/encryption" "github.com/TBD54566975/ftl/internal/log" "github.com/TBD54566975/ftl/internal/model" "github.com/TBD54566975/ftl/internal/sha256" @@ -27,9 +28,11 @@ import ( func TestDAL(t *testing.T) { ctx := log.ContextWithNewDefaultLogger(context.Background()) conn := sqltest.OpenForTesting(ctx, t) - dal, err := New(ctx, conn, encryption.NewBuilder()) + encryption, err := encryption.New(ctx, conn, ftlencryption.NewBuilder()) assert.NoError(t, err) + dal := New(ctx, conn, encryption) + var testContent = bytes.Repeat([]byte("sometestcontentthatislongerthanthereadbuffer"), 100) var testSHA = sha256.Sum(testContent) @@ -291,9 +294,11 @@ func TestDAL(t *testing.T) { func TestCreateArtefactConflict(t *testing.T) { ctx := log.ContextWithNewDefaultLogger(context.Background()) conn := sqltest.OpenForTesting(ctx, t) - dal, err := New(ctx, conn, encryption.NewBuilder()) + encryption, err := encryption.New(ctx, conn, ftlencryption.NewBuilder()) assert.NoError(t, err) + dal := New(ctx, conn, encryption) + idch := make(chan sha256.SHA256, 2) wg := sync.WaitGroup{} @@ -368,9 +373,11 @@ func assertEventsEqual(t *testing.T, expected, actual []TimelineEvent) { func TestDeleteOldEvents(t *testing.T) { ctx := log.ContextWithNewDefaultLogger(context.Background()) conn := sqltest.OpenForTesting(ctx, t) - dal, err := New(ctx, conn, encryption.NewBuilder()) + encryption, err := encryption.New(ctx, conn, ftlencryption.NewBuilder()) assert.NoError(t, err) + dal := New(ctx, conn, encryption) + var testContent = bytes.Repeat([]byte("sometestcontentthatislongerthanthereadbuffer"), 100) var testSha sha256.SHA256 @@ -459,81 +466,3 @@ func TestDeleteOldEvents(t *testing.T) { assert.Equal(t, int64(0), count) }) } - -func TestVerifyEncryption(t *testing.T) { - ctx := log.ContextWithNewDefaultLogger(context.Background()) - conn := sqltest.OpenForTesting(ctx, t) - uri := "fake-kms://CK6YwYkBElQKSAowdHlwZS5nb29nbGVhcGlzLmNvbS9nb29nbGUuY3J5cHRvLnRpbmsuQWVzR2NtS2V5EhIaEJy4TIQgfCuwxA3ZZgChp_wYARABGK6YwYkBIAE" - - t.Run("DeleteVerificationColumns", func(t *testing.T) { - dal, err := New(ctx, conn, encryption.NewBuilder().WithKMSURI(optional.Some(uri))) - assert.NoError(t, err) - - // check that there are columns set in encryption_keys - row, err := dal.db.GetOnlyEncryptionKey(ctx) - assert.NoError(t, err) - assert.NotZero(t, row.VerifyTimeline.Ok()) - assert.NotZero(t, row.VerifyAsync.Ok()) - - // delete the columns to see if they are recreated - err = dal.db.UpdateEncryptionVerification(ctx, optional.None[encryption.EncryptedTimelineColumn](), optional.None[encryption.EncryptedAsyncColumn]()) - assert.NoError(t, err) - - dal, err = New(ctx, conn, encryption.NewBuilder().WithKMSURI(optional.Some(uri))) - assert.NoError(t, err) - - row, err = dal.db.GetOnlyEncryptionKey(ctx) - assert.NoError(t, err) - assert.NotZero(t, row.VerifyTimeline.Ok()) - assert.NotZero(t, row.VerifyAsync.Ok()) - }) - - t.Run("DifferentKey", func(t *testing.T) { - _, err := New(ctx, conn, encryption.NewBuilder().WithKMSURI(optional.Some(uri))) - assert.NoError(t, err) - - differentKey := "fake-kms://CJP7ksIKElQKSAowdHlwZS5nb29nbGVhcGlzLmNvbS9nb29nbGUuY3J5cHRvLnRpbmsuQWVzR2NtS2V5EhIaEJWT3z-xdW23HO7hc9vF3YoYARABGJP7ksIKIAE" - _, err = New(ctx, conn, encryption.NewBuilder().WithKMSURI(optional.Some(differentKey))) - assert.Error(t, err) - assert.Contains(t, err.Error(), "decryption failed") - }) - - t.Run("SameKeyButWrongTimelineVerification", func(t *testing.T) { - dal, err := New(ctx, conn, encryption.NewBuilder().WithKMSURI(optional.Some(uri))) - assert.NoError(t, err) - - err = dal.db.UpdateEncryptionVerification(ctx, optional.Some[encryption.EncryptedTimelineColumn]([]byte("123")), optional.None[encryption.EncryptedAsyncColumn]()) - assert.NoError(t, err) - _, err = New(ctx, conn, encryption.NewBuilder().WithKMSURI(optional.Some(uri))) - assert.Error(t, err) - assert.Contains(t, err.Error(), "verification sanity") - assert.Contains(t, err.Error(), "verify timeline") - - err = dal.db.UpdateEncryptionVerification(ctx, optional.None[encryption.EncryptedTimelineColumn](), optional.Some[encryption.EncryptedAsyncColumn]([]byte("123"))) - assert.NoError(t, err) - _, err = New(ctx, conn, encryption.NewBuilder().WithKMSURI(optional.Some(uri))) - assert.Error(t, err) - assert.Contains(t, err.Error(), "verification sanity") - assert.Contains(t, err.Error(), "verify async") - }) - - t.Run("SameKeyButEncryptWrongPlainText", func(t *testing.T) { - result, err := conn.Exec("DELETE FROM encryption_keys") - assert.NoError(t, err) - affected, err := result.RowsAffected() - assert.NoError(t, err) - assert.Equal(t, int64(1), affected) - dal, err := New(ctx, conn, encryption.NewBuilder().WithKMSURI(optional.Some(uri))) - assert.NoError(t, err) - - encrypted := encryption.EncryptedColumn[encryption.TimelineSubKey]{} - err = dal.encrypt([]byte("123"), &encrypted) - assert.NoError(t, err) - - err = dal.db.UpdateEncryptionVerification(ctx, optional.Some(encrypted), optional.None[encryption.EncryptedAsyncColumn]()) - assert.NoError(t, err) - _, err = New(ctx, conn, encryption.NewBuilder().WithKMSURI(optional.Some(uri))) - assert.Error(t, err) - assert.Contains(t, err.Error(), "string does not match") - }) -} diff --git a/backend/controller/dal/events.go b/backend/controller/dal/events.go index 2b7253a00..931aa0e42 100644 --- a/backend/controller/dal/events.go +++ b/backend/controller/dal/events.go @@ -348,7 +348,7 @@ func (d *DAL) transformRowsToTimelineEvents(deploymentKeys map[int64]model.Deplo switch row.Type { case sql.EventTypeLog: var jsonPayload eventLogJSON - if err := d.decryptJSON(&row.Payload, &jsonPayload); err != nil { + if err := d.encryption.DecryptJSON(&row.Payload, &jsonPayload); err != nil { return nil, fmt.Errorf("failed to decrypt log event: %w", err) } @@ -370,7 +370,7 @@ func (d *DAL) transformRowsToTimelineEvents(deploymentKeys map[int64]model.Deplo case sql.EventTypeCall: var jsonPayload eventCallJSON - if err := d.decryptJSON(&row.Payload, &jsonPayload); err != nil { + if err := d.encryption.DecryptJSON(&row.Payload, &jsonPayload); err != nil { return nil, fmt.Errorf("failed to decrypt call event: %w", err) } var sourceVerb optional.Option[schema.Ref] @@ -395,7 +395,7 @@ func (d *DAL) transformRowsToTimelineEvents(deploymentKeys map[int64]model.Deplo case sql.EventTypeDeploymentCreated: var jsonPayload eventDeploymentCreatedJSON - if err := d.decryptJSON(&row.Payload, &jsonPayload); err != nil { + if err := d.encryption.DecryptJSON(&row.Payload, &jsonPayload); err != nil { return nil, fmt.Errorf("failed to decrypt call event: %w", err) } out = append(out, &DeploymentCreatedEvent{ @@ -410,7 +410,7 @@ func (d *DAL) transformRowsToTimelineEvents(deploymentKeys map[int64]model.Deplo case sql.EventTypeDeploymentUpdated: var jsonPayload eventDeploymentUpdatedJSON - if err := d.decryptJSON(&row.Payload, &jsonPayload); err != nil { + if err := d.encryption.DecryptJSON(&row.Payload, &jsonPayload); err != nil { return nil, fmt.Errorf("failed to decrypt call event: %w", err) } out = append(out, &DeploymentUpdatedEvent{ diff --git a/backend/controller/dal/fsm.go b/backend/controller/dal/fsm.go index 7fe962ce6..670fe409a 100644 --- a/backend/controller/dal/fsm.go +++ b/backend/controller/dal/fsm.go @@ -36,7 +36,7 @@ func (d *DAL) StartFSMTransition(ctx context.Context, fsm schema.RefKey, instanc if encrypted { encryptedRequest = encryption.EncryptedAsyncColumn(request) } else { - err = d.encrypt(request, &encryptedRequest) + err = d.encryption.Encrypt(request, &encryptedRequest) if err != nil { return fmt.Errorf("failed to encrypt FSM request: %w", err) } @@ -148,7 +148,7 @@ func (d *DAL) PopNextFSMEvent(ctx context.Context, fsm schema.RefKey, instanceKe func (d *DAL) SetNextFSMEvent(ctx context.Context, fsm schema.RefKey, instanceKey string, nextState schema.RefKey, request json.RawMessage, requestType schema.Type) error { var encryptedRequest encryption.EncryptedAsyncColumn - err := d.encryptJSON(request, &encryptedRequest) + err := d.encryption.EncryptJSON(request, &encryptedRequest) if err != nil { return fmt.Errorf("failed to encrypt FSM request: %w", err) } @@ -185,7 +185,7 @@ type FSMInstance struct { // // The lease must be released by the caller. func (d *DAL) AcquireFSMInstance(ctx context.Context, fsm schema.RefKey, instanceKey string) (*FSMInstance, error) { - lease, _, err := d.leasedal.AcquireLease(ctx, leases.SystemKey("fsm_instance", fsm.String(), instanceKey), time.Second*5, optional.None[any]()) + lease, _, err := d.leaseDAL.AcquireLease(ctx, leases.SystemKey("fsm_instance", fsm.String(), instanceKey), time.Second*5, optional.None[any]()) if err != nil { return nil, fmt.Errorf("failed to acquire FSM lease: %w", err) } diff --git a/backend/controller/dal/fsm_test.go b/backend/controller/dal/fsm_test.go index 5af23510d..690658c73 100644 --- a/backend/controller/dal/fsm_test.go +++ b/backend/controller/dal/fsm_test.go @@ -8,20 +8,23 @@ import ( "github.com/alecthomas/assert/v2" "github.com/alecthomas/types/either" + "github.com/TBD54566975/ftl/backend/controller/encryption" leasedal "github.com/TBD54566975/ftl/backend/controller/leases/dal" "github.com/TBD54566975/ftl/backend/controller/sql/sqltest" "github.com/TBD54566975/ftl/backend/libdal" "github.com/TBD54566975/ftl/backend/schema" - "github.com/TBD54566975/ftl/internal/encryption" + ftlencryption "github.com/TBD54566975/ftl/internal/encryption" "github.com/TBD54566975/ftl/internal/log" ) func TestSendFSMEvent(t *testing.T) { ctx := log.ContextWithNewDefaultLogger(context.Background()) conn := sqltest.OpenForTesting(ctx, t) - dal, err := New(ctx, conn, encryption.NewBuilder()) + encryption, err := encryption.New(ctx, conn, ftlencryption.NewBuilder()) assert.NoError(t, err) + dal := New(ctx, conn, encryption) + _, _, err = dal.AcquireAsyncCall(ctx) assert.IsError(t, err, libdal.ErrNotFound) diff --git a/backend/controller/dal/internal/sql/querier.go b/backend/controller/dal/internal/sql/querier.go index eb0d5074c..87f441bce 100644 --- a/backend/controller/dal/internal/sql/querier.go +++ b/backend/controller/dal/internal/sql/querier.go @@ -29,7 +29,6 @@ type Querier interface { CreateCronJob(ctx context.Context, arg CreateCronJobParams) error CreateDeployment(ctx context.Context, moduleName string, schema []byte, key model.DeploymentKey) error CreateIngressRoute(ctx context.Context, arg CreateIngressRouteParams) error - CreateOnlyEncryptionKey(ctx context.Context, key []byte) error CreateRequest(ctx context.Context, origin Origin, key model.RequestKey, sourceAddr string) error DeleteOldTimelineEvents(ctx context.Context, timeout sqltypes.Duration, type_ EventType) (int64, error) DeleteSubscribers(ctx context.Context, deployment model.DeploymentKey) ([]model.SubscriberKey, error) @@ -62,7 +61,6 @@ type Querier interface { GetIngressRoutes(ctx context.Context, method string) ([]GetIngressRoutesRow, error) GetModulesByID(ctx context.Context, ids []int64) ([]Module, error) GetNextEventForSubscription(ctx context.Context, consumptionDelay sqltypes.Duration, topic model.TopicKey, cursor optional.Option[model.TopicEventKey]) (GetNextEventForSubscriptionRow, error) - GetOnlyEncryptionKey(ctx context.Context) (GetOnlyEncryptionKeyRow, error) GetProcessList(ctx context.Context) ([]GetProcessListRow, error) GetRandomSubscriber(ctx context.Context, key model.SubscriptionKey) (GetRandomSubscriberRow, error) // Retrieve routing information for a runner. @@ -105,7 +103,6 @@ type Querier interface { SucceedAsyncCall(ctx context.Context, response encryption.OptionalEncryptedAsyncColumn, iD int64) (bool, error) SucceedFSMInstance(ctx context.Context, fsm schema.RefKey, key string) (bool, error) UpdateCronJobExecution(ctx context.Context, arg UpdateCronJobExecutionParams) error - UpdateEncryptionVerification(ctx context.Context, verifyTimeline encryption.OptionalEncryptedTimelineColumn, verifyAsync encryption.OptionalEncryptedAsyncColumn) error UpsertController(ctx context.Context, key model.ControllerKey, endpoint string) (int64, error) UpsertModule(ctx context.Context, language string, name string) (int64, error) // Upsert a runner and return the deployment ID that it is assigned to, if any. diff --git a/backend/controller/dal/internal/sql/queries.sql b/backend/controller/dal/internal/sql/queries.sql index 715f2d496..c57c62d92 100644 --- a/backend/controller/dal/internal/sql/queries.sql +++ b/backend/controller/dal/internal/sql/queries.sql @@ -741,21 +741,6 @@ SELECT * FROM topic_events WHERE id = $1::BIGINT; --- name: GetOnlyEncryptionKey :one -SELECT key, verify_timeline, verify_async -FROM encryption_keys -WHERE id = 1; - --- name: CreateOnlyEncryptionKey :exec -INSERT INTO encryption_keys (id, key) -VALUES (1, $1); - --- name: UpdateEncryptionVerification :exec -UPDATE encryption_keys -SET verify_timeline = $1, - verify_async = $2 -WHERE id = 1; - -- name: AcquireAsyncCall :one -- Reserve a pending async call for execution, returning the associated lease -- reservation key and accompanying metadata. @@ -794,4 +779,4 @@ RETURNING max_backoff, parent_request_key, trace_context, - catching; \ No newline at end of file + catching; diff --git a/backend/controller/dal/internal/sql/queries.sql.go b/backend/controller/dal/internal/sql/queries.sql.go index becf8e13a..b6c3f1f1f 100644 --- a/backend/controller/dal/internal/sql/queries.sql.go +++ b/backend/controller/dal/internal/sql/queries.sql.go @@ -246,16 +246,6 @@ func (q *Queries) CreateIngressRoute(ctx context.Context, arg CreateIngressRoute return err } -const createOnlyEncryptionKey = `-- name: CreateOnlyEncryptionKey :exec -INSERT INTO encryption_keys (id, key) -VALUES (1, $1) -` - -func (q *Queries) CreateOnlyEncryptionKey(ctx context.Context, key []byte) error { - _, err := q.db.ExecContext(ctx, createOnlyEncryptionKey, key) - return err -} - const createRequest = `-- name: CreateRequest :exec INSERT INTO requests (origin, "key", source_addr) VALUES ($1, $2, $3) @@ -1192,25 +1182,6 @@ func (q *Queries) GetNextEventForSubscription(ctx context.Context, consumptionDe return i, err } -const getOnlyEncryptionKey = `-- name: GetOnlyEncryptionKey :one -SELECT key, verify_timeline, verify_async -FROM encryption_keys -WHERE id = 1 -` - -type GetOnlyEncryptionKeyRow struct { - Key []byte - VerifyTimeline encryption.OptionalEncryptedTimelineColumn - VerifyAsync encryption.OptionalEncryptedAsyncColumn -} - -func (q *Queries) GetOnlyEncryptionKey(ctx context.Context) (GetOnlyEncryptionKeyRow, error) { - row := q.db.QueryRowContext(ctx, getOnlyEncryptionKey) - var i GetOnlyEncryptionKeyRow - err := row.Scan(&i.Key, &i.VerifyTimeline, &i.VerifyAsync) - return i, err -} - const getProcessList = `-- name: GetProcessList :many SELECT d.min_replicas, d.key AS deployment_key, @@ -2351,18 +2322,6 @@ func (q *Queries) UpdateCronJobExecution(ctx context.Context, arg UpdateCronJobE return err } -const updateEncryptionVerification = `-- name: UpdateEncryptionVerification :exec -UPDATE encryption_keys -SET verify_timeline = $1, - verify_async = $2 -WHERE id = 1 -` - -func (q *Queries) UpdateEncryptionVerification(ctx context.Context, verifyTimeline encryption.OptionalEncryptedTimelineColumn, verifyAsync encryption.OptionalEncryptedAsyncColumn) error { - _, err := q.db.ExecContext(ctx, updateEncryptionVerification, verifyTimeline, verifyAsync) - return err -} - const upsertController = `-- name: UpsertController :one INSERT INTO controller (key, endpoint) VALUES ($1, $2) diff --git a/backend/controller/dal/pubsub.go b/backend/controller/dal/pubsub.go index 389ee3509..447735c42 100644 --- a/backend/controller/dal/pubsub.go +++ b/backend/controller/dal/pubsub.go @@ -22,7 +22,7 @@ import ( func (d *DAL) PublishEventForTopic(ctx context.Context, module, topic, caller string, payload []byte) error { var encryptedPayload encryption.EncryptedAsyncColumn - err := d.encrypt(payload, &encryptedPayload) + err := d.encryption.Encrypt(payload, &encryptedPayload) if err != nil { return fmt.Errorf("failed to encrypt payload: %w", err) } diff --git a/backend/controller/dal/encryption.go b/backend/controller/encryption/dal/dal.go similarity index 74% rename from backend/controller/dal/encryption.go rename to backend/controller/encryption/dal/dal.go index ad374ae3f..7574b6e04 100644 --- a/backend/controller/dal/encryption.go +++ b/backend/controller/encryption/dal/dal.go @@ -2,54 +2,31 @@ package dal import ( "context" - "encoding/json" "fmt" "github.com/alecthomas/types/optional" + "github.com/TBD54566975/ftl/backend/controller/encryption/dal/internal/sql" "github.com/TBD54566975/ftl/backend/libdal" "github.com/TBD54566975/ftl/internal/encryption" "github.com/TBD54566975/ftl/internal/log" ) -func (d *DAL) encrypt(cleartext []byte, dest encryption.Encrypted) error { - err := d.encryptor.Encrypt(cleartext, dest) - if err != nil { - return fmt.Errorf("failed to encrypt binary with subkey %s: %w", dest.SubKey(), err) - } - - return nil -} - -func (d *DAL) decrypt(encrypted encryption.Encrypted) ([]byte, error) { - v, err := d.encryptor.Decrypt(encrypted) - if err != nil { - return nil, fmt.Errorf("failed to decrypt binary with subkey %s: %w", encrypted.SubKey(), err) - } - - return v, nil +type DAL struct { + *libdal.Handle[DAL] + db sql.Querier } -func (d *DAL) encryptJSON(v any, dest encryption.Encrypted) error { - serialized, err := json.Marshal(v) - if err != nil { - return fmt.Errorf("failed to marshal JSON: %w", err) +func New(ctx context.Context, conn libdal.Connection) *DAL { + return &DAL{ + Handle: libdal.New(conn, func(h *libdal.Handle[DAL]) *DAL { + return &DAL{ + Handle: h, + db: sql.New(h.Connection), + } + }), + db: sql.New(conn), } - - return d.encrypt(serialized, dest) -} - -func (d *DAL) decryptJSON(encrypted encryption.Encrypted, v any) error { //nolint:unparam - decrypted, err := d.decrypt(encrypted) - if err != nil { - return fmt.Errorf("failed to decrypt json with subkey %s: %w", encrypted.SubKey(), err) - } - - if err = json.Unmarshal(decrypted, v); err != nil { - return fmt.Errorf("failed to unmarshal JSON: %w", err) - } - - return nil } func (d *DAL) EnsureKey(ctx context.Context, generateKey func() ([]byte, error)) (encryptedKey []byte, err error) { @@ -85,7 +62,7 @@ func (d *DAL) EnsureKey(ctx context.Context, generateKey func() ([]byte, error)) const verification = "FTL - Towards a 𝝺-calculus for large-scale systems" -func (d *DAL) verifyEncryptor(ctx context.Context, encryptor encryption.DataEncryptor) (err error) { +func (d *DAL) VerifyEncryptor(ctx context.Context, encryptor encryption.DataEncryptor) (err error) { tx, err := d.Begin(ctx) if err != nil { return fmt.Errorf("failed to begin transaction: %w", err) diff --git a/backend/controller/encryption/dal/internal/sql/db.go b/backend/controller/encryption/dal/internal/sql/db.go new file mode 100644 index 000000000..0e0973111 --- /dev/null +++ b/backend/controller/encryption/dal/internal/sql/db.go @@ -0,0 +1,31 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.27.0 + +package sql + +import ( + "context" + "database/sql" +) + +type DBTX interface { + ExecContext(context.Context, string, ...interface{}) (sql.Result, error) + PrepareContext(context.Context, string) (*sql.Stmt, error) + QueryContext(context.Context, string, ...interface{}) (*sql.Rows, error) + QueryRowContext(context.Context, string, ...interface{}) *sql.Row +} + +func New(db DBTX) *Queries { + return &Queries{db: db} +} + +type Queries struct { + db DBTX +} + +func (q *Queries) WithTx(tx *sql.Tx) *Queries { + return &Queries{ + db: tx, + } +} diff --git a/backend/controller/encryption/dal/internal/sql/models.go b/backend/controller/encryption/dal/internal/sql/models.go new file mode 100644 index 000000000..f6e6518d4 --- /dev/null +++ b/backend/controller/encryption/dal/internal/sql/models.go @@ -0,0 +1,537 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.27.0 + +package sql + +import ( + "database/sql/driver" + "encoding/json" + "fmt" + "time" + + "github.com/TBD54566975/ftl/backend/controller/leases" + "github.com/TBD54566975/ftl/backend/controller/sql/sqltypes" + "github.com/TBD54566975/ftl/backend/schema" + "github.com/TBD54566975/ftl/internal/encryption" + "github.com/TBD54566975/ftl/internal/model" + "github.com/alecthomas/types/optional" + "github.com/google/uuid" + "github.com/sqlc-dev/pqtype" +) + +type AsyncCallState string + +const ( + AsyncCallStatePending AsyncCallState = "pending" + AsyncCallStateExecuting AsyncCallState = "executing" + AsyncCallStateSuccess AsyncCallState = "success" + AsyncCallStateError AsyncCallState = "error" +) + +func (e *AsyncCallState) Scan(src interface{}) error { + switch s := src.(type) { + case []byte: + *e = AsyncCallState(s) + case string: + *e = AsyncCallState(s) + default: + return fmt.Errorf("unsupported scan type for AsyncCallState: %T", src) + } + return nil +} + +type NullAsyncCallState struct { + AsyncCallState AsyncCallState + Valid bool // Valid is true if AsyncCallState is not NULL +} + +// Scan implements the Scanner interface. +func (ns *NullAsyncCallState) Scan(value interface{}) error { + if value == nil { + ns.AsyncCallState, ns.Valid = "", false + return nil + } + ns.Valid = true + return ns.AsyncCallState.Scan(value) +} + +// Value implements the driver Valuer interface. +func (ns NullAsyncCallState) Value() (driver.Value, error) { + if !ns.Valid { + return nil, nil + } + return string(ns.AsyncCallState), nil +} + +type ControllerState string + +const ( + ControllerStateLive ControllerState = "live" + ControllerStateDead ControllerState = "dead" +) + +func (e *ControllerState) Scan(src interface{}) error { + switch s := src.(type) { + case []byte: + *e = ControllerState(s) + case string: + *e = ControllerState(s) + default: + return fmt.Errorf("unsupported scan type for ControllerState: %T", src) + } + return nil +} + +type NullControllerState struct { + ControllerState ControllerState + Valid bool // Valid is true if ControllerState is not NULL +} + +// Scan implements the Scanner interface. +func (ns *NullControllerState) Scan(value interface{}) error { + if value == nil { + ns.ControllerState, ns.Valid = "", false + return nil + } + ns.Valid = true + return ns.ControllerState.Scan(value) +} + +// Value implements the driver Valuer interface. +func (ns NullControllerState) Value() (driver.Value, error) { + if !ns.Valid { + return nil, nil + } + return string(ns.ControllerState), nil +} + +type EventType string + +const ( + EventTypeCall EventType = "call" + EventTypeLog EventType = "log" + EventTypeDeploymentCreated EventType = "deployment_created" + EventTypeDeploymentUpdated EventType = "deployment_updated" +) + +func (e *EventType) Scan(src interface{}) error { + switch s := src.(type) { + case []byte: + *e = EventType(s) + case string: + *e = EventType(s) + default: + return fmt.Errorf("unsupported scan type for EventType: %T", src) + } + return nil +} + +type NullEventType struct { + EventType EventType + Valid bool // Valid is true if EventType is not NULL +} + +// Scan implements the Scanner interface. +func (ns *NullEventType) Scan(value interface{}) error { + if value == nil { + ns.EventType, ns.Valid = "", false + return nil + } + ns.Valid = true + return ns.EventType.Scan(value) +} + +// Value implements the driver Valuer interface. +func (ns NullEventType) Value() (driver.Value, error) { + if !ns.Valid { + return nil, nil + } + return string(ns.EventType), nil +} + +type FsmStatus string + +const ( + FsmStatusRunning FsmStatus = "running" + FsmStatusCompleted FsmStatus = "completed" + FsmStatusFailed FsmStatus = "failed" +) + +func (e *FsmStatus) Scan(src interface{}) error { + switch s := src.(type) { + case []byte: + *e = FsmStatus(s) + case string: + *e = FsmStatus(s) + default: + return fmt.Errorf("unsupported scan type for FsmStatus: %T", src) + } + return nil +} + +type NullFsmStatus struct { + FsmStatus FsmStatus + Valid bool // Valid is true if FsmStatus is not NULL +} + +// Scan implements the Scanner interface. +func (ns *NullFsmStatus) Scan(value interface{}) error { + if value == nil { + ns.FsmStatus, ns.Valid = "", false + return nil + } + ns.Valid = true + return ns.FsmStatus.Scan(value) +} + +// Value implements the driver Valuer interface. +func (ns NullFsmStatus) Value() (driver.Value, error) { + if !ns.Valid { + return nil, nil + } + return string(ns.FsmStatus), nil +} + +type Origin string + +const ( + OriginIngress Origin = "ingress" + OriginCron Origin = "cron" + OriginPubsub Origin = "pubsub" +) + +func (e *Origin) Scan(src interface{}) error { + switch s := src.(type) { + case []byte: + *e = Origin(s) + case string: + *e = Origin(s) + default: + return fmt.Errorf("unsupported scan type for Origin: %T", src) + } + return nil +} + +type NullOrigin struct { + Origin Origin + Valid bool // Valid is true if Origin is not NULL +} + +// Scan implements the Scanner interface. +func (ns *NullOrigin) Scan(value interface{}) error { + if value == nil { + ns.Origin, ns.Valid = "", false + return nil + } + ns.Valid = true + return ns.Origin.Scan(value) +} + +// Value implements the driver Valuer interface. +func (ns NullOrigin) Value() (driver.Value, error) { + if !ns.Valid { + return nil, nil + } + return string(ns.Origin), nil +} + +type RunnerState string + +const ( + RunnerStateNew RunnerState = "new" + RunnerStateReserved RunnerState = "reserved" + RunnerStateAssigned RunnerState = "assigned" + RunnerStateDead RunnerState = "dead" +) + +func (e *RunnerState) Scan(src interface{}) error { + switch s := src.(type) { + case []byte: + *e = RunnerState(s) + case string: + *e = RunnerState(s) + default: + return fmt.Errorf("unsupported scan type for RunnerState: %T", src) + } + return nil +} + +type NullRunnerState struct { + RunnerState RunnerState + Valid bool // Valid is true if RunnerState is not NULL +} + +// Scan implements the Scanner interface. +func (ns *NullRunnerState) Scan(value interface{}) error { + if value == nil { + ns.RunnerState, ns.Valid = "", false + return nil + } + ns.Valid = true + return ns.RunnerState.Scan(value) +} + +// Value implements the driver Valuer interface. +func (ns NullRunnerState) Value() (driver.Value, error) { + if !ns.Valid { + return nil, nil + } + return string(ns.RunnerState), nil +} + +type TopicSubscriptionState string + +const ( + TopicSubscriptionStateIdle TopicSubscriptionState = "idle" + TopicSubscriptionStateExecuting TopicSubscriptionState = "executing" +) + +func (e *TopicSubscriptionState) Scan(src interface{}) error { + switch s := src.(type) { + case []byte: + *e = TopicSubscriptionState(s) + case string: + *e = TopicSubscriptionState(s) + default: + return fmt.Errorf("unsupported scan type for TopicSubscriptionState: %T", src) + } + return nil +} + +type NullTopicSubscriptionState struct { + TopicSubscriptionState TopicSubscriptionState + Valid bool // Valid is true if TopicSubscriptionState is not NULL +} + +// Scan implements the Scanner interface. +func (ns *NullTopicSubscriptionState) Scan(value interface{}) error { + if value == nil { + ns.TopicSubscriptionState, ns.Valid = "", false + return nil + } + ns.Valid = true + return ns.TopicSubscriptionState.Scan(value) +} + +// Value implements the driver Valuer interface. +func (ns NullTopicSubscriptionState) Value() (driver.Value, error) { + if !ns.Valid { + return nil, nil + } + return string(ns.TopicSubscriptionState), nil +} + +type Artefact struct { + ID int64 + CreatedAt time.Time + Digest []byte + Content []byte +} + +type AsyncCall struct { + ID int64 + CreatedAt time.Time + LeaseID optional.Option[int64] + Verb schema.RefKey + State AsyncCallState + Origin string + ScheduledAt time.Time + Request encryption.EncryptedAsyncColumn + Response encryption.OptionalEncryptedAsyncColumn + Error optional.Option[string] + RemainingAttempts int32 + Backoff sqltypes.Duration + MaxBackoff sqltypes.Duration + CatchVerb optional.Option[schema.RefKey] + Catching bool + ParentRequestKey optional.Option[string] + TraceContext pqtype.NullRawMessage +} + +type Controller struct { + ID int64 + Key model.ControllerKey + Created time.Time + LastSeen time.Time + State ControllerState + Endpoint string +} + +type CronJob struct { + ID int64 + Key model.CronJobKey + DeploymentID int64 + Verb string + Schedule string + StartTime time.Time + NextExecution time.Time + ModuleName string + LastExecution optional.Option[time.Time] + LastAsyncCallID optional.Option[int64] +} + +type Deployment struct { + ID int64 + CreatedAt time.Time + ModuleID int64 + Key model.DeploymentKey + Schema *schema.Module + Labels json.RawMessage + MinReplicas int32 +} + +type DeploymentArtefact struct { + ArtefactID int64 + DeploymentID int64 + CreatedAt time.Time + Executable bool + Path string +} + +type EncryptionKey struct { + ID int64 + Key []byte + CreatedAt time.Time + VerifyTimeline encryption.OptionalEncryptedTimelineColumn + VerifyAsync encryption.OptionalEncryptedAsyncColumn +} + +type FsmInstance struct { + ID int64 + CreatedAt time.Time + Fsm schema.RefKey + Key string + Status FsmStatus + CurrentState optional.Option[schema.RefKey] + DestinationState optional.Option[schema.RefKey] + AsyncCallID optional.Option[int64] + UpdatedAt time.Time +} + +type FsmNextEvent struct { + ID int64 + CreatedAt time.Time + FsmInstanceID int64 + NextState schema.RefKey + Request []byte + RequestType sqltypes.Type +} + +type IngressRoute struct { + Method string + Path string + DeploymentID int64 + Module string + Verb string +} + +type Lease struct { + ID int64 + IdempotencyKey uuid.UUID + Key leases.Key + CreatedAt time.Time + ExpiresAt time.Time + Metadata pqtype.NullRawMessage +} + +type Module struct { + ID int64 + Language string + Name string +} + +type ModuleConfiguration struct { + ID int64 + CreatedAt time.Time + Module optional.Option[string] + Name string + Value json.RawMessage +} + +type ModuleSecret struct { + ID int64 + CreatedAt time.Time + Module optional.Option[string] + Name string + Url string +} + +type Request struct { + ID int64 + Origin Origin + Key model.RequestKey + SourceAddr string +} + +type Runner struct { + ID int64 + Key model.RunnerKey + Created time.Time + LastSeen time.Time + State RunnerState + Endpoint string + ModuleName optional.Option[string] + DeploymentID int64 + Labels json.RawMessage +} + +type Timeline struct { + ID int64 + TimeStamp time.Time + DeploymentID int64 + RequestID optional.Option[int64] + Type EventType + CustomKey1 optional.Option[string] + CustomKey2 optional.Option[string] + CustomKey3 optional.Option[string] + CustomKey4 optional.Option[string] + Payload encryption.EncryptedTimelineColumn + ParentRequestID optional.Option[string] +} + +type Topic struct { + ID int64 + Key model.TopicKey + CreatedAt time.Time + ModuleID int64 + Name string + Type string + Head optional.Option[int64] +} + +type TopicEvent struct { + ID int64 + CreatedAt time.Time + Key model.TopicEventKey + TopicID int64 + Payload []byte + Caller optional.Option[string] + RequestKey optional.Option[string] + TraceContext pqtype.NullRawMessage +} + +type TopicSubscriber struct { + ID int64 + Key model.SubscriberKey + CreatedAt time.Time + TopicSubscriptionsID int64 + DeploymentID int64 + Sink schema.RefKey + RetryAttempts int32 + Backoff sqltypes.Duration + MaxBackoff sqltypes.Duration + CatchVerb optional.Option[schema.RefKey] +} + +type TopicSubscription struct { + ID int64 + Key model.SubscriptionKey + CreatedAt time.Time + TopicID int64 + ModuleID int64 + DeploymentID int64 + Name string + Cursor optional.Option[int64] + State TopicSubscriptionState +} diff --git a/backend/controller/encryption/dal/internal/sql/querier.go b/backend/controller/encryption/dal/internal/sql/querier.go new file mode 100644 index 000000000..b11be0425 --- /dev/null +++ b/backend/controller/encryption/dal/internal/sql/querier.go @@ -0,0 +1,19 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.27.0 + +package sql + +import ( + "context" + + "github.com/TBD54566975/ftl/internal/encryption" +) + +type Querier interface { + CreateOnlyEncryptionKey(ctx context.Context, key []byte) error + GetOnlyEncryptionKey(ctx context.Context) (GetOnlyEncryptionKeyRow, error) + UpdateEncryptionVerification(ctx context.Context, verifyTimeline encryption.OptionalEncryptedTimelineColumn, verifyAsync encryption.OptionalEncryptedAsyncColumn) error +} + +var _ Querier = (*Queries)(nil) diff --git a/backend/controller/encryption/dal/internal/sql/queries.sql b/backend/controller/encryption/dal/internal/sql/queries.sql new file mode 100644 index 000000000..61a698aa4 --- /dev/null +++ b/backend/controller/encryption/dal/internal/sql/queries.sql @@ -0,0 +1,14 @@ +-- name: GetOnlyEncryptionKey :one +SELECT key, verify_timeline, verify_async +FROM encryption_keys +WHERE id = 1; + +-- name: CreateOnlyEncryptionKey :exec +INSERT INTO encryption_keys (id, key) +VALUES (1, $1); + +-- name: UpdateEncryptionVerification :exec +UPDATE encryption_keys +SET verify_timeline = $1, + verify_async = $2 +WHERE id = 1; diff --git a/backend/controller/encryption/dal/internal/sql/queries.sql.go b/backend/controller/encryption/dal/internal/sql/queries.sql.go new file mode 100644 index 000000000..16502e215 --- /dev/null +++ b/backend/controller/encryption/dal/internal/sql/queries.sql.go @@ -0,0 +1,53 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.27.0 +// source: queries.sql + +package sql + +import ( + "context" + + "github.com/TBD54566975/ftl/internal/encryption" +) + +const createOnlyEncryptionKey = `-- name: CreateOnlyEncryptionKey :exec +INSERT INTO encryption_keys (id, key) +VALUES (1, $1) +` + +func (q *Queries) CreateOnlyEncryptionKey(ctx context.Context, key []byte) error { + _, err := q.db.ExecContext(ctx, createOnlyEncryptionKey, key) + return err +} + +const getOnlyEncryptionKey = `-- name: GetOnlyEncryptionKey :one +SELECT key, verify_timeline, verify_async +FROM encryption_keys +WHERE id = 1 +` + +type GetOnlyEncryptionKeyRow struct { + Key []byte + VerifyTimeline encryption.OptionalEncryptedTimelineColumn + VerifyAsync encryption.OptionalEncryptedAsyncColumn +} + +func (q *Queries) GetOnlyEncryptionKey(ctx context.Context) (GetOnlyEncryptionKeyRow, error) { + row := q.db.QueryRowContext(ctx, getOnlyEncryptionKey) + var i GetOnlyEncryptionKeyRow + err := row.Scan(&i.Key, &i.VerifyTimeline, &i.VerifyAsync) + return i, err +} + +const updateEncryptionVerification = `-- name: UpdateEncryptionVerification :exec +UPDATE encryption_keys +SET verify_timeline = $1, + verify_async = $2 +WHERE id = 1 +` + +func (q *Queries) UpdateEncryptionVerification(ctx context.Context, verifyTimeline encryption.OptionalEncryptedTimelineColumn, verifyAsync encryption.OptionalEncryptedAsyncColumn) error { + _, err := q.db.ExecContext(ctx, updateEncryptionVerification, verifyTimeline, verifyAsync) + return err +} diff --git a/backend/controller/encryption/encryption.go b/backend/controller/encryption/encryption.go new file mode 100644 index 000000000..4631e6aea --- /dev/null +++ b/backend/controller/encryption/encryption.go @@ -0,0 +1,72 @@ +package encryption + +import ( + "context" + "encoding/json" + "fmt" + + "github.com/TBD54566975/ftl/backend/controller/encryption/dal" + "github.com/TBD54566975/ftl/backend/libdal" + "github.com/TBD54566975/ftl/internal/encryption" +) + +type Service struct { + encryptor encryption.DataEncryptor +} + +func New(ctx context.Context, conn libdal.Connection, encryptionBuilder encryption.Builder) (*Service, error) { + d := dal.New(ctx, conn) + + encryptor, err := encryptionBuilder.Build(ctx, d) + if err != nil { + return nil, fmt.Errorf("build encryptor: %w", err) + } + + if err := d.VerifyEncryptor(ctx, encryptor); err != nil { + return nil, fmt.Errorf("verify encryptor: %w", err) + } + + return &Service{encryptor: encryptor}, nil +} + +// EncryptJSON encrypts the given JSON object and stores it in the provided destination. +func (s *Service) EncryptJSON(v any, dest encryption.Encrypted) error { + serialized, err := json.Marshal(v) + if err != nil { + return fmt.Errorf("failed to marshal JSON: %w", err) + } + + return s.Encrypt(serialized, dest) +} + +// DecryptJSON decrypts the given encrypted object and stores it in the provided destination. +func (s *Service) DecryptJSON(encrypted encryption.Encrypted, v any) error { + decrypted, err := s.Decrypt(encrypted) + if err != nil { + return fmt.Errorf("failed to decrypt json with subkey %s: %w", encrypted.SubKey(), err) + } + + if err = json.Unmarshal(decrypted, v); err != nil { + return fmt.Errorf("failed to unmarshal JSON: %w", err) + } + + return nil +} + +func (s *Service) Encrypt(cleartext []byte, dest encryption.Encrypted) error { + err := s.encryptor.Encrypt(cleartext, dest) + if err != nil { + return fmt.Errorf("failed to encrypt binary with subkey %s: %w", dest.SubKey(), err) + } + + return nil +} + +func (s *Service) Decrypt(encrypted encryption.Encrypted) ([]byte, error) { + v, err := s.encryptor.Decrypt(encrypted) + if err != nil { + return nil, fmt.Errorf("failed to decrypt binary with subkey %s: %w", encrypted.SubKey(), err) + } + + return v, nil +} diff --git a/backend/controller/encryption/encryption_test.go b/backend/controller/encryption/encryption_test.go new file mode 100644 index 000000000..5dfc6610f --- /dev/null +++ b/backend/controller/encryption/encryption_test.go @@ -0,0 +1,56 @@ +package encryption + +import ( + "bytes" + "context" + "testing" + + "github.com/alecthomas/assert/v2" + "github.com/alecthomas/types/optional" + + "github.com/TBD54566975/ftl/backend/controller/sql/sqltest" + ftlencryption "github.com/TBD54566975/ftl/internal/encryption" + "github.com/TBD54566975/ftl/internal/log" +) + +func TestEncryptionService(t *testing.T) { + ctx := log.ContextWithNewDefaultLogger(context.Background()) + conn := sqltest.OpenForTesting(ctx, t) + uri := "fake-kms://CK6YwYkBElQKSAowdHlwZS5nb29nbGVhcGlzLmNvbS9nb29nbGUuY3J5cHRvLnRpbmsuQWVzR2NtS2V5EhIaEJy4TIQgfCuwxA3ZZgChp_wYARABGK6YwYkBIAE" + + t.Run("EncryptDecryptJSON", func(t *testing.T) { + service, err := New(ctx, conn, ftlencryption.NewBuilder().WithKMSURI(optional.Some(uri))) + assert.NoError(t, err) + + type TestStruct struct { + Name string + Age int + } + + original := TestStruct{Name: "John Doe", Age: 30} + var encrypted ftlencryption.EncryptedTimelineColumn + err = service.EncryptJSON(original, &encrypted) + assert.NoError(t, err) + + var decrypted TestStruct + err = service.DecryptJSON(&encrypted, &decrypted) + assert.NoError(t, err) + + assert.Equal(t, original, decrypted) + }) + + t.Run("EncryptDecryptBinary", func(t *testing.T) { + service, err := New(ctx, conn, ftlencryption.NewBuilder().WithKMSURI(optional.Some(uri))) + assert.NoError(t, err) + + original := []byte("Hello, World!") + var encrypted ftlencryption.EncryptedTimelineColumn + err = service.Encrypt(original, &encrypted) + assert.NoError(t, err) + + decrypted, err := service.Decrypt(&encrypted) + assert.NoError(t, err) + + assert.True(t, bytes.Equal(original, decrypted)) + }) +} diff --git a/sqlc.yaml b/sqlc.yaml index 6df758c70..aee5d2826 100644 --- a/sqlc.yaml +++ b/sqlc.yaml @@ -170,6 +170,13 @@ sql: go: <<: *gengo out: "backend/controller/leases/dal/internal/sql" + - <<: *daldir + queries: + - backend/controller/encryption/dal/internal/sql/queries.sql + gen: + go: + <<: *gengo + out: "backend/controller/encryption/dal/internal/sql" rules: - name: postgresql-query-too-costly message: "Query cost estimate is too high"