Skip to content

Commit

Permalink
hybrid registry implementation and integration
Browse files Browse the repository at this point in the history
  • Loading branch information
jonathanj-square committed Oct 30, 2024
1 parent 46fdd15 commit 9003f9c
Show file tree
Hide file tree
Showing 11 changed files with 132 additions and 47 deletions.
9 changes: 7 additions & 2 deletions backend/controller/artefacts/artefact.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package artefacts

import (
"context"
"github.com/TBD54566975/ftl/backend/libdal"
"io"

"github.com/TBD54566975/ftl/internal/model"
Expand Down Expand Up @@ -31,15 +32,19 @@ type ReleaseArtefact struct {
Executable bool
}

type Registry interface {
type Service interface {
// GetDigestsKeys locates the `digests` corresponding `ArtefactKey`s and identifies the missing ones
GetDigestsKeys(ctx context.Context, digests []sha256.SHA256) (keys []ArtefactKey, missing []sha256.SHA256, err error)
// Upload pushes the specified media, and metadata, to the registry and returns the computed digest
Upload(context context.Context, artefact Artefact) (sha256.SHA256, error)
// Download performs a streaming download of the artefact identified by the supplied digest
Download(context context.Context, digest sha256.SHA256) (io.ReadCloser, error)
// GetReleaseArtefacts locates the artefacts metadata corresponding with the specified release
GetReleaseArtefacts(ctx context.Context, releaseID int64) ([]ArtefactKey, error)
GetReleaseArtefacts(ctx context.Context, releaseID int64) ([]ReleaseArtefact, error)
// AddReleaseArtefact associates the given `release` with the artefact associated with the given `digest`
AddReleaseArtefact(ctx context.Context, key model.DeploymentKey, ra ReleaseArtefact) error
}

func New(c ContainerConfig, conn libdal.Connection) Service {
return newHybridRegistry(c, conn)
}
28 changes: 11 additions & 17 deletions backend/controller/artefacts/dal_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,30 +19,24 @@ type ArtefactRow struct {
Digest []byte
}

type DAL interface {
GetArtefactDigests(ctx context.Context, digests [][]byte) ([]ArtefactRow, error)
CreateArtefact(ctx context.Context, digest []byte, content []byte) (int64, error)
GetArtefactContentRange(ctx context.Context, start int32, count int32, iD int64) ([]byte, error)
}

type Service struct {
*libdal.Handle[Service]
type dalRegistry struct {
*libdal.Handle[dalRegistry]
db sql.Querier
}

func New(conn libdal.Connection) *Service {
return &Service{
func newDALRegistry(conn libdal.Connection) *dalRegistry {
return &dalRegistry{
db: sql.New(conn),
Handle: libdal.New(conn, func(h *libdal.Handle[Service]) *Service {
return &Service{
Handle: libdal.New(conn, func(h *libdal.Handle[dalRegistry]) *dalRegistry {
return &dalRegistry{
Handle: h,
db: sql.New(h.Connection),
}
}),
}
}

func (s *Service) GetDigestsKeys(ctx context.Context, digests []sha256.SHA256) (keys []ArtefactKey, missing []sha256.SHA256, err error) {
func (s *dalRegistry) GetDigestsKeys(ctx context.Context, digests []sha256.SHA256) (keys []ArtefactKey, missing []sha256.SHA256, err error) {
have, err := s.db.GetArtefactDigests(ctx, sha256esToBytes(digests))
if err != nil {
return nil, nil, libdal.TranslatePGError(err)
Expand All @@ -57,13 +51,13 @@ func (s *Service) GetDigestsKeys(ctx context.Context, digests []sha256.SHA256) (
return keys, missing, nil
}

func (s *Service) Upload(ctx context.Context, artefact Artefact) (sha256.SHA256, error) {
func (s *dalRegistry) Upload(ctx context.Context, artefact Artefact) (sha256.SHA256, error) {
sha256digest := sha256.Sum(artefact.Content)
_, err := s.db.CreateArtefact(ctx, sha256digest[:], artefact.Content)
return sha256digest, libdal.TranslatePGError(err)
}

func (s *Service) Download(ctx context.Context, digest sha256.SHA256) (io.ReadCloser, error) {
func (s *dalRegistry) Download(ctx context.Context, digest sha256.SHA256) (io.ReadCloser, error) {
digests := [][]byte{digest[:]}
rows, err := s.db.GetArtefactDigests(ctx, digests)
if err != nil {
Expand All @@ -75,7 +69,7 @@ func (s *Service) Download(ctx context.Context, digest sha256.SHA256) (io.ReadCl
return &dalArtefactStream{db: s.db, id: rows[0].ID}, nil
}

func (s *Service) GetReleaseArtefacts(ctx context.Context, releaseID int64) ([]ReleaseArtefact, error) {
func (s *dalRegistry) GetReleaseArtefacts(ctx context.Context, releaseID int64) ([]ReleaseArtefact, error) {
return getDatabaseReleaseArtefacts(ctx, s.db, releaseID)
}

Expand All @@ -93,7 +87,7 @@ func getDatabaseReleaseArtefacts(ctx context.Context, db sql.Querier, releaseID
}), nil
}

func (s *Service) AddReleaseArtefact(ctx context.Context, key model.DeploymentKey, ra ReleaseArtefact) error {
func (s *dalRegistry) AddReleaseArtefact(ctx context.Context, key model.DeploymentKey, ra ReleaseArtefact) error {
return addReleaseArtefacts(ctx, s.db, key, ra)
}

Expand Down
68 changes: 68 additions & 0 deletions backend/controller/artefacts/hybrid_registry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package artefacts

import (
"context"
"fmt"
"github.com/TBD54566975/ftl/backend/libdal"
"github.com/TBD54566975/ftl/internal/model"
"github.com/TBD54566975/ftl/internal/sha256"
"io"
)

type hybridRegistry struct {
container *containerRegistry
dal *dalRegistry
Handle *libdal.Handle[hybridRegistry]
}

func newHybridRegistry(c ContainerConfig, conn libdal.Connection) *hybridRegistry {
return &hybridRegistry{
container: newContainerRegistry(c, conn),
dal: newDALRegistry(conn),
Handle: libdal.New(conn, func(h *libdal.Handle[hybridRegistry]) *hybridRegistry {
return &hybridRegistry{
container: newContainerRegistry(c, h.Connection),
dal: newDALRegistry(h.Connection),
Handle: h,
}
}),
}
}

// GetDigestsKeys locates the `ArtefactKey` for each digest from the container store and database store. The entries
// located on the container store take precedent.
func (s *hybridRegistry) GetDigestsKeys(ctx context.Context, digests []sha256.SHA256) (keys []ArtefactKey, missing []sha256.SHA256, err error) {
ck, cm, err := s.container.GetDigestsKeys(ctx, digests)
if err != nil {
return nil, nil, fmt.Errorf("unable to get digests keys from container store: %w", err)
}
dk, dm, err := s.dal.GetDigestsKeys(ctx, cm)
if err != nil {
return nil, nil, fmt.Errorf("unable to get digests keys from database store: %w", err)
}
return append(ck, dk...), dm, nil
}

func (s *hybridRegistry) Upload(ctx context.Context, artefact Artefact) (sha256.SHA256, error) {
return s.container.Upload(ctx, artefact)
}

func (s *hybridRegistry) Download(ctx context.Context, digest sha256.SHA256) (io.ReadCloser, error) {
present, _, err := s.container.GetDigestsKeys(ctx, []sha256.SHA256{digest})
if err != nil {
return nil, fmt.Errorf("unable to verify artifact's (%s) presence in the container store: %w", digest, err)
}
if len(present) == 1 {
return s.container.Download(ctx, digest)
}
return s.dal.Download(ctx, digest)
}

func (s *hybridRegistry) GetReleaseArtefacts(ctx context.Context, releaseID int64) ([]ReleaseArtefact, error) {
// note: the container and database store currently use release_artefacts to associated
return s.container.GetReleaseArtefacts(ctx, releaseID)
}

func (s *hybridRegistry) AddReleaseArtefact(ctx context.Context, key model.DeploymentKey, ra ReleaseArtefact) error {
return s.container.AddReleaseArtefact(ctx, key, ra)
}
22 changes: 11 additions & 11 deletions backend/controller/artefacts/oci_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,12 @@ type ContainerConfig struct {
AllowPlainHTTP bool `help:"Allows OCI container requests to accept plain HTTP responses" env:"FTL_ARTEFACTS_ALLOW_HTTP"`
}

type ContainerService struct {
type containerRegistry struct {
host string
repoFactory func() (*remote.Repository, error)

// in the interim releases and artefacts will continue to be linked via the `deployment_artefacts` table
Handle *libdal.Handle[ContainerService]
Handle *libdal.Handle[containerRegistry]
db sql.Querier
}

Expand All @@ -56,7 +56,7 @@ type ArtefactBlobs struct {
Size int64
}

func NewContainerService(c ContainerConfig, conn libdal.Connection) *ContainerService {
func newContainerRegistry(c ContainerConfig, conn libdal.Connection) *containerRegistry {
// Connect the registry targeting the specified container
repoFactory := func() (*remote.Repository, error) {
ref := fmt.Sprintf("%s/%s", c.Registry, ModuleBlobsPrefix)
Expand All @@ -78,11 +78,11 @@ func NewContainerService(c ContainerConfig, conn libdal.Connection) *ContainerSe
return reg, nil
}

return &ContainerService{
return &containerRegistry{
host: c.Registry,
repoFactory: repoFactory,
Handle: libdal.New(conn, func(h *libdal.Handle[ContainerService]) *ContainerService {
return &ContainerService{
Handle: libdal.New(conn, func(h *libdal.Handle[containerRegistry]) *containerRegistry {
return &containerRegistry{
host: c.Registry,
repoFactory: repoFactory,
Handle: h,
Expand All @@ -92,7 +92,7 @@ func NewContainerService(c ContainerConfig, conn libdal.Connection) *ContainerSe
}
}

func (s *ContainerService) GetDigestsKeys(ctx context.Context, digests []sha256.SHA256) (keys []ArtefactKey, missing []sha256.SHA256, err error) {
func (s *containerRegistry) GetDigestsKeys(ctx context.Context, digests []sha256.SHA256) (keys []ArtefactKey, missing []sha256.SHA256, err error) {
repo, err := s.repoFactory()
if err != nil {
return nil, nil, fmt.Errorf("unable to connect to container registry '%s': %w", s.host, err)
Expand Down Expand Up @@ -122,7 +122,7 @@ func (s *ContainerService) GetDigestsKeys(ctx context.Context, digests []sha256.
}

// Upload uploads the specific artifact as a raw blob and links it to a manifest to prevent GC
func (s *ContainerService) Upload(ctx context.Context, artefact Artefact) (sha256.SHA256, error) {
func (s *containerRegistry) Upload(ctx context.Context, artefact Artefact) (sha256.SHA256, error) {
repo, err := s.repoFactory()
if err != nil {
return sha256.SHA256{}, fmt.Errorf("unable to connect to repository '%s': %w", s.host, err)
Expand All @@ -134,7 +134,7 @@ func (s *ContainerService) Upload(ctx context.Context, artefact Artefact) (sha25
return artefact.Digest, nil
}

func (s *ContainerService) Download(ctx context.Context, digest sha256.SHA256) (io.ReadCloser, error) {
func (s *containerRegistry) Download(ctx context.Context, digest sha256.SHA256) (io.ReadCloser, error) {
ref := createModuleRepositoryPathFromDigest(digest)
registry, err := s.repoFactory()
if err != nil {
Expand All @@ -147,11 +147,11 @@ func (s *ContainerService) Download(ctx context.Context, digest sha256.SHA256) (
return stream, nil
}

func (s *ContainerService) GetReleaseArtefacts(ctx context.Context, releaseID int64) ([]ReleaseArtefact, error) {
func (s *containerRegistry) GetReleaseArtefacts(ctx context.Context, releaseID int64) ([]ReleaseArtefact, error) {
return getDatabaseReleaseArtefacts(ctx, s.db, releaseID)
}

func (s *ContainerService) AddReleaseArtefact(ctx context.Context, key model.DeploymentKey, ra ReleaseArtefact) error {
func (s *containerRegistry) AddReleaseArtefact(ctx context.Context, key model.DeploymentKey, ra ReleaseArtefact) error {
return addReleaseArtefacts(ctx, s.db, key, ra)
}

Expand Down
9 changes: 6 additions & 3 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ type Config struct {
KMSURI *string `help:"URI for KMS key e.g. with fake-kms:// or aws-kms://arn:aws:kms:ap-southeast-2:12345:key/0000-1111" env:"FTL_KMS_URI"`
MaxOpenDBConnections int `help:"Maximum number of database connections." default:"20" env:"FTL_MAX_OPEN_DB_CONNECTIONS"`
MaxIdleDBConnections int `help:"Maximum number of idle database connections." default:"20" env:"FTL_MAX_IDLE_DB_CONNECTIONS"`
ContainerConfig artefacts.ContainerConfig
CommonConfig
}

Expand Down Expand Up @@ -225,7 +226,7 @@ type Service struct {
tasks *scheduledtask.Scheduler
cronJobs *cronjobs.Service
pubSub *pubsub.Service
registry *artefacts.Service
registry artefacts.Service
timeline *timeline.Service
controllerListListeners []ControllerListListener

Expand Down Expand Up @@ -279,13 +280,15 @@ func New(ctx context.Context, conn *sql.DB, config Config, devel bool, runnerSca
pubSub := pubsub.New(ctx, conn, encryption, optional.Some[pubsub.AsyncCallListener](svc))
svc.pubSub = pubSub

svc.registry = artefacts.New(conn)
svc.registry = artefacts.New(config.ContainerConfig, conn)

timelineSvc := timeline.New(ctx, conn, encryption)
svc.timeline = timelineSvc
cronSvc := cronjobs.New(ctx, key, svc.config.Advertise.Host, encryption, timelineSvc, conn)
svc.cronJobs = cronSvc
svc.dal = dal.New(ctx, conn, encryption, pubSub, cronSvc)
svc.dal = dal.New(ctx, conn, encryption, pubSub, cronSvc, func(c libdal.Connection) artefacts.Service {
return artefacts.New(config.ContainerConfig, conn)
})

svc.deploymentLogsSink = newDeploymentLogsSink(ctx, timelineSvc)

Expand Down
5 changes: 4 additions & 1 deletion backend/controller/cronjobs/internal/cronjobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package internal
import (
"context"
"fmt"
"github.com/TBD54566975/ftl/backend/controller/artefacts"
"strconv"
"testing"
"time"
Expand Down Expand Up @@ -48,7 +49,9 @@ func TestNewCronJobsForModule(t *testing.T) {
cjs := cronjobs.NewForTesting(ctx, key, "test.com", encryption, timelineSrv, *dal, clk)

pubSub := pubsub.New(ctx, conn, encryption, optional.None[pubsub.AsyncCallListener]())
parentDAL := parentdal.New(ctx, conn, encryption, pubSub, cjs)
parentDAL := parentdal.New(ctx, conn, encryption, pubSub, cjs, func(c libdal.Connection) artefacts.Service {
return nil
})
moduleName := "initial"
jobsToCreate := newCronJobs(t, moduleName, "* * * * * *", clk, 2) // every minute
decls := []schema.Decl{}
Expand Down
5 changes: 4 additions & 1 deletion backend/controller/dal/async_calls_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package dal

import (
"context"
"github.com/TBD54566975/ftl/backend/controller/artefacts"
"testing"

"github.com/alecthomas/assert/v2"
Expand All @@ -23,7 +24,9 @@ func TestNoCallToAcquire(t *testing.T) {
encryption, err := encryption.New(ctx, conn, encryption.NewBuilder())
assert.NoError(t, err)
pubSub := pubsub.New(ctx, conn, encryption, optional.None[pubsub.AsyncCallListener]())
dal := New(ctx, conn, encryption, pubSub, nil)
dal := New(ctx, conn, encryption, pubSub, nil, func(c libdal.Connection) artefacts.Service {
return nil
})

_, _, err = dal.AcquireAsyncCall(ctx)
assert.IsError(t, err, libdal.ErrNotFound)
Expand Down
8 changes: 4 additions & 4 deletions backend/controller/dal/dal.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,22 +52,22 @@ type Reservation interface {
Rollback(ctx context.Context) error
}

func New(ctx context.Context, conn libdal.Connection, encryption *encryption.Service, pubsub *pubsub.Service, cron *cronjobs.Service) *DAL {
func New(_ context.Context, conn libdal.Connection, encryption *encryption.Service, pubsub *pubsub.Service, cron *cronjobs.Service, registryFactory func(c libdal.Connection) aregistry.Service) *DAL {
var d *DAL
db := dalsql.New(conn)
d = &DAL{
leaser: dbleaser.NewDatabaseLeaser(conn),
db: db,
encryption: encryption,
registry: aregistry.New(conn),
registry: registryFactory(conn),
Handle: libdal.New(conn, func(h *libdal.Handle[DAL]) *DAL {
return &DAL{
Handle: h,
db: dalsql.New(h.Connection),
leaser: dbleaser.NewDatabaseLeaser(h.Connection),
pubsub: pubsub,
encryption: d.encryption,
registry: aregistry.New(h.Connection),
registry: registryFactory(h.Connection),
DeploymentChanges: d.DeploymentChanges,
cronjobs: cron,
}
Expand All @@ -85,7 +85,7 @@ type DAL struct {
leaser *dbleaser.DatabaseLeaser
pubsub *pubsub.Service
encryption *encryption.Service
registry *aregistry.Service
registry aregistry.Service
cronjobs *cronjobs.Service

// DeploymentChanges is a Topic that receives changes to the deployments table.
Expand Down
8 changes: 6 additions & 2 deletions backend/controller/dal/dal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ func TestDAL(t *testing.T) {
timelineSrv := timeline.New(ctx, conn, encryption)
key := model.NewControllerKey("localhost", "8081")
cjs := cronjobs.New(ctx, key, "test.com", encryption, timelineSrv, conn)
dal := New(ctx, conn, encryption, pubSub, cjs)
dal := New(ctx, conn, encryption, pubSub, cjs, func(c libdal.Connection) artefacts.Service {
return nil
})

var testContent = bytes.Repeat([]byte("sometestcontentthatislongerthanthereadbuffer"), 100)
var testSHA = sha256.Sum(testContent)
Expand Down Expand Up @@ -200,7 +202,9 @@ func TestCreateArtefactConflict(t *testing.T) {
timelineSrv := timeline.New(ctx, conn, encryption)
key := model.NewControllerKey("localhost", "8081")
cjs := cronjobs.New(ctx, key, "test.com", encryption, timelineSrv, conn)
dal := New(ctx, conn, encryption, pubSub, cjs)
dal := New(ctx, conn, encryption, pubSub, cjs, func(c libdal.Connection) artefacts.Service {
return nil
})

idch := make(chan sha256.SHA256, 2)

Expand Down
Loading

0 comments on commit 9003f9c

Please sign in to comment.