Skip to content

Commit

Permalink
it works
Browse files Browse the repository at this point in the history
  • Loading branch information
stuartwdouglas committed Nov 5, 2024
1 parent 64bd0e0 commit c4ff2b2
Show file tree
Hide file tree
Showing 10 changed files with 80 additions and 36 deletions.
26 changes: 18 additions & 8 deletions backend/controller/artefacts/oci_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ import (
"oras.land/oras-go/v2/registry/remote/auth"
"oras.land/oras-go/v2/registry/remote/retry"

"github.com/google/go-containerregistry/pkg/authn"
"github.com/google/go-containerregistry/pkg/name"
googleremote "github.com/google/go-containerregistry/pkg/v1/remote"

"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/sha256"
)
Expand All @@ -35,6 +39,7 @@ type RegistryConfig struct {
type OCIArtifactService struct {
repository string
repoFactory func() (*remote.Repository, error)
auth authn.AuthConfig
}

type ArtefactRepository struct {
Expand All @@ -51,6 +56,10 @@ type ArtefactBlobs struct {
Size int64
}

func NewForTesting() *OCIArtifactService {
return NewOCIRegistryStorage(RegistryConfig{Registry: "127.0.0.1:15000/ftl-tests", AllowInsecure: true})
}

func NewOCIRegistryStorage(c RegistryConfig) *OCIArtifactService {
// Connect the registry targeting the specified container
repoFactory := func() (*remote.Repository, error) {
Expand All @@ -75,6 +84,7 @@ func NewOCIRegistryStorage(c RegistryConfig) *OCIArtifactService {
return &OCIArtifactService{
repository: c.Registry,
repoFactory: repoFactory,
auth: authn.AuthConfig{Username: c.Username, Password: c.Password},
}
}

Expand Down Expand Up @@ -157,18 +167,18 @@ func (s *OCIArtifactService) Upload(ctx context.Context, artefact Artefact) (sha
}

func (s *OCIArtifactService) Download(ctx context.Context, dg sha256.SHA256) (io.ReadCloser, error) {
registry, err := s.repoFactory()
// ORAS is really annoying, and needs you to know the size of the blob you're downloading
// So we are using google's go-containerregistry to do the actual download
// This is not great, we should remove oras at some point
newDigest, err := name.NewDigest(fmt.Sprintf("%s@sha256:%s", s.repository, dg.String()))
if err != nil {
return nil, fmt.Errorf("unable to connect to registry '%s': %w", s.repository, err)
return nil, fmt.Errorf("unable to create digest '%s': %w", dg, err)
}
// ORAS is really annoying, and needs you to know the size of the blob you're downloading
// So we need to resolve the blob first to get the size

stream, err := registry.Blobs().Fetch(ctx, ocispec.Descriptor{Size: -1, Digest: digest.NewDigestFromHex("sha256", dg.String())})
layer, err := googleremote.Layer(newDigest, googleremote.WithAuthFromKeychain(authn.DefaultKeychain))
if err != nil {
return nil, fmt.Errorf("unable to download artefact from %s: %w", s.repository, err)
return nil, fmt.Errorf("unable to read layer '%s': %w", newDigest, err)
}
return stream, nil
return layer.Uncompressed()
}

func pushBlob(ctx context.Context, mediaType string, blob []byte, target oras.Target) (desc ocispec.Descriptor, err error) {
Expand Down
3 changes: 2 additions & 1 deletion backend/controller/cronjobs/internal/cronjobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/alecthomas/types/optional"
"github.com/benbjohnson/clock"

"github.com/TBD54566975/ftl/backend/controller/artefacts"
"github.com/TBD54566975/ftl/backend/controller/async"
"github.com/TBD54566975/ftl/backend/controller/cronjobs"
"github.com/TBD54566975/ftl/backend/controller/cronjobs/internal/dal"
Expand Down Expand Up @@ -48,7 +49,7 @@ func TestNewCronJobsForModule(t *testing.T) {
cjs := cronjobs.NewForTesting(ctx, key, "test.com", encryption, timelineSrv, *dal, clk)

pubSub := pubsub.New(ctx, conn, encryption, optional.None[pubsub.AsyncCallListener]())
parentDAL := parentdal.New(ctx, conn, encryption, pubSub, cjs)
parentDAL := parentdal.New(ctx, conn, encryption, pubSub, cjs, artefacts.NewForTesting())
moduleName := "initial"
jobsToCreate := newCronJobs(t, moduleName, "* * * * * *", clk, 2) // every minute
decls := []schema.Decl{}
Expand Down
2 changes: 1 addition & 1 deletion backend/controller/dal/async_calls_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func TestNoCallToAcquire(t *testing.T) {
encryption, err := encryption.New(ctx, conn, encryption.NewBuilder())
assert.NoError(t, err)
pubSub := pubsub.New(ctx, conn, encryption, optional.None[pubsub.AsyncCallListener]())
dal := New(ctx, conn, encryption, pubSub, nil)
dal := New(ctx, conn, encryption, pubSub, nil, nil)

_, _, err = dal.AcquireAsyncCall(ctx)
assert.IsError(t, err, libdal.ErrNotFound)
Expand Down
20 changes: 8 additions & 12 deletions backend/controller/dal/dal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package dal
import (
"bytes"
"context"
"io"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -37,7 +36,7 @@ func TestDAL(t *testing.T) {
timelineSrv := timeline.New(ctx, conn, encryption)
key := model.NewControllerKey("localhost", "8081")
cjs := cronjobs.New(ctx, key, "test.com", encryption, timelineSrv, conn)
dal := New(ctx, conn, encryption, pubSub, cjs)
dal := New(ctx, conn, encryption, pubSub, cjs, artefacts.NewForTesting())

var testContent = bytes.Repeat([]byte("sometestcontentthatislongerthanthereadbuffer"), 100)
var testSHA = sha256.Sum(testContent)
Expand Down Expand Up @@ -84,15 +83,15 @@ func TestDAL(t *testing.T) {
{Path: "dir/filename",
Executable: true,
Digest: testSHA,
Content: io.NopCloser(bytes.NewReader(testContent))},
},
},
}
expectedContent := artefactContent(t, deployment.Artefacts)
expectedContent := artefactHashes(t, deployment.Artefacts)

t.Run("GetDeployment", func(t *testing.T) {
actual, err := dal.GetDeployment(ctx, deploymentKey)
assert.NoError(t, err)
actualContent := artefactContent(t, actual.Artefacts)
actualContent := artefactHashes(t, actual.Artefacts)
assert.Equal(t, expectedContent, actualContent)
assert.Equal(t, deployment, actual)
})
Expand Down Expand Up @@ -200,7 +199,7 @@ func TestCreateArtefactConflict(t *testing.T) {
timelineSrv := timeline.New(ctx, conn, encryption)
key := model.NewControllerKey("localhost", "8081")
cjs := cronjobs.New(ctx, key, "test.com", encryption, timelineSrv, conn)
dal := New(ctx, conn, encryption, pubSub, cjs)
dal := New(ctx, conn, encryption, pubSub, cjs, artefacts.NewForTesting())

idch := make(chan sha256.SHA256, 2)

Expand Down Expand Up @@ -237,14 +236,11 @@ func TestCreateArtefactConflict(t *testing.T) {
assert.Equal(t, ids[0], ids[1])
}

func artefactContent(t testing.TB, artefacts []*model.Artefact) [][]byte {
func artefactHashes(t testing.TB, artefacts []*model.Artefact) []sha256.SHA256 {
t.Helper()
var result [][]byte
var result []sha256.SHA256
for _, a := range artefacts {
content, err := io.ReadAll(a.Content)
assert.NoError(t, err)
result = append(result, content)
a.Content = nil
result = append(result, a.Digest)
}
return result
}
8 changes: 4 additions & 4 deletions backend/controller/timeline/internal/timeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,12 @@ func TestTimeline(t *testing.T) {
assert.NoError(t, err)

timeline := timeline2.New(ctx, conn, encryption)
registry := artefacts.NewDALRegistry(conn)
pubSub := pubsub.New(ctx, conn, encryption, optional.None[pubsub.AsyncCallListener]())

key := model.NewControllerKey("localhost", strconv.Itoa(8080+1))
cjs := cronjobs.New(ctx, key, "test.com", encryption, timeline, conn)
controllerDAL := controllerdal.New(ctx, conn, encryption, pubSub, cjs)
registry := artefacts.NewForTesting()
controllerDAL := controllerdal.New(ctx, conn, encryption, pubSub, cjs, registry)

var testContent = bytes.Repeat([]byte("sometestcontentthatislongerthanthereadbuffer"), 100)

Expand Down Expand Up @@ -268,9 +268,9 @@ func TestDeleteOldEvents(t *testing.T) {
assert.NoError(t, err)

timeline := timeline2.New(ctx, conn, encryption)
registry := artefacts.NewDALRegistry(conn)
registry := artefacts.NewForTesting()
pubSub := pubsub.New(ctx, conn, encryption, optional.None[pubsub.AsyncCallListener]())
controllerDAL := controllerdal.New(ctx, conn, encryption, pubSub, nil)
controllerDAL := controllerdal.New(ctx, conn, encryption, pubSub, nil, registry)

var testContent = bytes.Repeat([]byte("sometestcontentthatislongerthanthereadbuffer"), 100)
var testSha sha256.SHA256
Expand Down
4 changes: 1 addition & 3 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,7 @@ services:
registry:
image: registry:2
ports:
- "5001:5000"
volumes:
- ./.registry:/var/lib/registry
- "15000:5000"

volumes:
grafana-storage: {}
11 changes: 10 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ require (
github.com/go-logr/logr v1.4.2
github.com/go-viper/mapstructure/v2 v2.2.1
github.com/google/go-cmp v0.6.0
github.com/google/go-containerregistry v0.19.1
github.com/google/uuid v1.6.0
github.com/hashicorp/cronexpr v1.1.2
github.com/jackc/pgerrcode v0.0.0-20240316143900-6e2875d9b438
Expand All @@ -45,6 +46,7 @@ require (
github.com/mattn/go-isatty v0.0.20
github.com/multiformats/go-base36 v0.2.0
github.com/opencontainers/go-digest v1.0.0
github.com/opencontainers/image-spec v1.1.0
github.com/otiai10/copy v1.14.0
github.com/posener/complete v1.2.3
github.com/radovskyb/watcher v1.0.7
Expand Down Expand Up @@ -101,8 +103,12 @@ require (
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.3 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.32.3 // indirect
github.com/aymanbagabas/go-osc52/v2 v2.0.1 // indirect
github.com/containerd/stargz-snapshotter/estargz v0.14.3 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/distribution/reference v0.5.0 // indirect
github.com/docker/cli v24.0.0+incompatible // indirect
github.com/docker/distribution v2.8.2+incompatible // indirect
github.com/docker/docker-credential-helpers v0.7.0 // indirect
github.com/docker/go-units v0.5.0 // indirect
github.com/emicklei/go-restful/v3 v3.11.0 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
Expand All @@ -123,26 +129,29 @@ require (
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.17.9 // indirect
github.com/lucasb-eyer/go-colorful v1.2.0 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/mattn/go-runewidth v0.0.14 // indirect
github.com/mitchellh/go-homedir v1.1.0 // indirect
github.com/moby/docker-image-spec v1.3.1 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/muesli/termenv v0.15.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/ncruces/go-strftime v0.1.9 // indirect
github.com/onsi/gomega v1.33.1 // indirect
github.com/opencontainers/image-spec v1.1.0 // indirect
github.com/petermattis/goid v0.0.0-20240813172612-4fcff4a6cae7 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pkoukk/tiktoken-go v0.1.6 // indirect
github.com/rivo/uniseg v0.2.0 // indirect
github.com/riywo/loginshell v0.0.0-20200815045211-7d26008be1ab // indirect
github.com/sasha-s/go-deadlock v0.3.5 // indirect
github.com/segmentio/ksuid v1.0.4 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/sourcegraph/jsonrpc2 v0.2.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/vbatts/tar-split v0.11.3 // indirect
github.com/x448/float16 v0.8.4 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.26.0 // indirect
go.opentelemetry.io/proto/otlp v1.3.1 // indirect
Expand Down
23 changes: 23 additions & 0 deletions go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit c4ff2b2

Please sign in to comment.