From b638f158d57f2b9b4a8476060bc29ed3e13459f7 Mon Sep 17 00:00:00 2001 From: Kairo Araujo Date: Mon, 27 May 2024 16:59:57 +0200 Subject: [PATCH] refactor: simplify archivista main cmd Simplifies Archivista main cmd moving all the service logic into the `pkg/server`. It allows users to use the Archivista Server instance as an Service Interface allowing to integrate along to API services. Signed-off-by: Kairo Araujo --- cmd/archivista/main.go | 123 +++----------------------- pkg/server/services.go | 191 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 203 insertions(+), 111 deletions(-) create mode 100644 pkg/server/services.go diff --git a/cmd/archivista/main.go b/cmd/archivista/main.go index 6d8606a4..7a5448a6 100644 --- a/cmd/archivista/main.go +++ b/cmd/archivista/main.go @@ -21,7 +21,6 @@ package main import ( "context" - "fmt" "net" "net/http" "os" @@ -32,13 +31,7 @@ import ( nested "github.com/antonfisher/nested-logrus-formatter" "github.com/gorilla/handlers" - "github.com/in-toto/archivista/internal/artifactstore" - "github.com/in-toto/archivista/internal/config" - "github.com/in-toto/archivista/internal/metadatastorage/sqlstore" - "github.com/in-toto/archivista/internal/objectstorage/blobstore" - "github.com/in-toto/archivista/internal/objectstorage/filestore" - "github.com/in-toto/archivista/internal/server" - "github.com/minio/minio-go/v7/pkg/credentials" + "github.com/in-toto/archivista/pkg/server" "github.com/sirupsen/logrus" ) @@ -57,76 +50,19 @@ func main() { defer cancel() startTime := time.Now() - serverOpts := make([]server.Option, 0) - logrus.Infof("executing phase 1: get config from environment (time since start: %s)", time.Since(startTime)) - now := time.Now() - - cfg := new(config.Config) - if err := cfg.Process(); err != nil { - logrus.Fatal(err) - } - - level, err := logrus.ParseLevel(cfg.LogLevel) - if err != nil { - logrus.Fatalf("invalid log level %s", cfg.LogLevel) - } - logrus.SetLevel(level) - - logrus.WithField("duration", time.Since(now)).Infof("completed phase 1: get config from environment") - - // ******************************************************************************** - logrus.Infof("executing phase 2: initializing storage clients (time since start: %s)", time.Since(startTime)) - // ******************************************************************************** - now = time.Now() - fileStore, fileStoreCh, err := initObjectStore(ctx, cfg) - if err != nil { - logrus.Fatalf("error initializing storage clients: %+v", err) - } - serverOpts = append(serverOpts, server.WithObjectStore(fileStore)) + archivistaService := &server.ArchivistaService{Ctx: ctx, Cfg: nil} - entClient, err := sqlstore.NewEntClient( - cfg.SQLStoreBackend, - cfg.SQLStoreConnectionString, - sqlstore.ClientWithMaxIdleConns(cfg.SQLStoreMaxIdleConnections), - sqlstore.ClientWithMaxOpenConns(cfg.SQLStoreMaxOpenConnections), - sqlstore.ClientWithConnMaxLifetime(cfg.SQLStoreConnectionMaxLifetime)) + server, err := archivistaService.Setup() if err != nil { - logrus.Fatalf("could not create ent client: %+v", err) + logrus.Fatalf("unable to setup archivista service: %+v", err) } - - sqlStore, sqlStoreCh, err := sqlstore.New(ctx, entClient) - if err != nil { - logrus.Fatalf("error initializing mysql client: %+v", err) - } - serverOpts = append(serverOpts, server.WithMetadataStore(sqlStore)) - - logrus.WithField("duration", time.Since(now)).Infof("completed phase 3: initializing storage clients") - // ******************************************************************************** - logrus.Infof("executing phase 3: create and register http service (time since start: %s)", time.Since(startTime)) + logrus.Infof("executing phase: create and register http service (time since start: %s)", time.Since(startTime)) // ******************************************************************************** - now = time.Now() - - // initialize the artifact store - if cfg.EnableArtifactStore { - wds, err := artifactstore.New(artifactstore.WithConfigFile(cfg.ArtifactStoreConfig)) - if err != nil { - logrus.Fatalf("could not create the artifact store: %+v", err) - } - - serverOpts = append(serverOpts, server.WithArtifactStore(wds)) - } - - // initialize the server - sqlClient := sqlStore.GetClient() - serverOpts = append(serverOpts, server.WithEntSqlClient(sqlClient)) - server, err := server.New(cfg, serverOpts...) - if err != nil { - logrus.Fatalf("could not create archivista server: %+v", err) - } + now := time.Now() - listenAddress := cfg.ListenOn + listenAddress := archivistaService.Cfg.ListenOn listenAddress = strings.ToLower(strings.TrimSpace(listenAddress)) proto := "" if strings.HasPrefix(listenAddress, "tcp://") { @@ -143,10 +79,10 @@ func main() { } srv := &http.Server{ Handler: handlers.CORS( - handlers.AllowedOrigins(cfg.CORSAllowOrigins), + handlers.AllowedOrigins(archivistaService.Cfg.CORSAllowOrigins), handlers.AllowedMethods([]string{"GET", "POST", "OPTIONS"}), handlers.AllowedHeaders([]string{"Accept", "Content-Type", "Content-Length", "Accept-Encoding", "X-CSRF-Token", "Authorization"}), - )(server.Router()), + )(server.Router()), ReadTimeout: 5 * time.Second, WriteTimeout: 10 * time.Second, } @@ -156,47 +92,12 @@ func main() { } }() - logrus.WithField("duration", time.Since(now)).Infof("completed phase 5: create and register http service") + logrus.WithField("duration", time.Since(now)).Infof("completed phase: create and register http service") logrus.Infof("startup complete (time since start: %s)", time.Since(startTime)) <-ctx.Done() - <-fileStoreCh - <-sqlStoreCh + <-archivistaService.GetFileStoreCh() + <-archivistaService.GetSQLStoreCh() logrus.Infof("exiting, uptime: %v", time.Since(startTime)) } - -func initObjectStore(ctx context.Context, cfg *config.Config) (server.StorerGetter, <-chan error, error) { - switch strings.ToUpper(cfg.StorageBackend) { - case "FILE": - return filestore.New(ctx, cfg.FileDir, cfg.FileServeOn) - - case "BLOB": - var creds *credentials.Credentials - if cfg.BlobStoreCredentialType == "IAM" { - creds = credentials.NewIAM("") - } else if cfg.BlobStoreCredentialType == "ACCESS_KEY" { - creds = credentials.NewStaticV4(cfg.BlobStoreAccessKeyId, cfg.BlobStoreSecretAccessKeyId, "") - } else { - logrus.Fatalln("invalid blob store credential type: ", cfg.BlobStoreCredentialType) - } - return blobstore.New( - ctx, - cfg.BlobStoreEndpoint, - creds, - cfg.BlobStoreBucketName, - cfg.BlobStoreUseTLS, - ) - - case "": - errCh := make(chan error) - go func() { - <-ctx.Done() - close(errCh) - }() - return nil, errCh, nil - - default: - return nil, nil, fmt.Errorf("unknown storage backend: %s", cfg.StorageBackend) - } -} diff --git a/pkg/server/services.go b/pkg/server/services.go new file mode 100644 index 00000000..5255864f --- /dev/null +++ b/pkg/server/services.go @@ -0,0 +1,191 @@ +// Copyright 2024 The Archivista Contributors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// A note: this follows a pattern followed by network service mesh. +// The pattern was copied from the Network Service Mesh Project +// and modified for use here. The original code was published under the +// Apache License V2. + +package server + +import ( + "context" + "fmt" + "strings" + "time" + + "github.com/in-toto/archivista/pkg/artifactstore" + "github.com/in-toto/archivista/pkg/config" + "github.com/in-toto/archivista/pkg/metadatastorage/sqlstore" + "github.com/in-toto/archivista/pkg/objectstorage/blobstore" + "github.com/in-toto/archivista/pkg/objectstorage/filestore" + "github.com/minio/minio-go/v7/pkg/credentials" + "github.com/sirupsen/logrus" +) + +// Service is the interface for the Archivista service +type Service interface { + Setup() (Server, error) + GetConfig() *config.Config + GetFileStoreCh() chan error + GetSQLStoreCh() chan error +} + +// ArchivistaService is the implementation of the Archivista service +type ArchivistaService struct { + Ctx context.Context // context for the service + Cfg *config.Config // configuration for the service (if none it uses environment variables) + fileStoreCh <-chan error + sqlStoreCh <-chan error +} + +// Setup Archivista Service +func (a *ArchivistaService) Setup() (*Server, error) { + var ( + level logrus.Level + err error + sqlStore *sqlstore.Store + fileStore StorerGetter + ) + serverOpts := make([]Option, 0) + + startTime := time.Now() + now := time.Now() + if a.Cfg == nil { + + logrus.Infof("executing: get config from environment (time since start: %s)", time.Since(startTime)) + + a.Cfg = new(config.Config) + if err := a.Cfg.Process(); err != nil { + logrus.Fatal(err) + } + level, err = logrus.ParseLevel(a.Cfg.LogLevel) + if err != nil { + logrus.Fatalf("invalid log level %s", a.Cfg.LogLevel) + } + logrus.WithField("duration", time.Since(now)).Infof("completed phase: get config from environment") + } else { + logrus.Infof("executing: load given config (time since start: %s)", time.Since(startTime)) + level, err = logrus.ParseLevel(a.Cfg.LogLevel) + if err != nil { + logrus.Fatalf("invalid log level %s", a.Cfg.LogLevel) + } + logrus.WithField("duration", time.Since(now)).Infof("completed phase: load given config") + } + logrus.SetLevel(level) + + // ******************************************************************************** + logrus.Infof("executing phase: initializing storage clients (time since start: %s)", time.Since(startTime)) + // ******************************************************************************** + now = time.Now() + fileStore, a.fileStoreCh, err = a.initObjectStore() + if err != nil { + logrus.Fatalf("could not create object store: %+v", err) + } + serverOpts = append(serverOpts, WithObjectStore(fileStore)) + + entClient, err := sqlstore.NewEntClient( + a.Cfg.SQLStoreBackend, + a.Cfg.SQLStoreConnectionString, + sqlstore.ClientWithMaxIdleConns(a.Cfg.SQLStoreMaxIdleConnections), + sqlstore.ClientWithMaxOpenConns(a.Cfg.SQLStoreMaxOpenConnections), + sqlstore.ClientWithConnMaxLifetime(a.Cfg.SQLStoreConnectionMaxLifetime), + ) + + if err != nil { + logrus.Fatalf("could not create ent client: %+v", err) + } + + // Continue with the existing setup code for the SQLStore + sqlStore, a.sqlStoreCh, err = sqlstore.New(context.Background(), entClient) + if err != nil { + logrus.Fatalf("error initializing new SQLStore: %+v", err) + } + serverOpts = append(serverOpts, WithMetadataStore(sqlStore)) + + // Add SQL client for ent + sqlClient := sqlStore.GetClient() + serverOpts = append(serverOpts, WithEntSqlClient(sqlClient)) + + // initialize the artifact store + if a.Cfg.EnableArtifactStore { + wds, err := artifactstore.New(artifactstore.WithConfigFile(a.Cfg.ArtifactStoreConfig)) + if err != nil { + logrus.Fatalf("could not create the artifact store: %+v", err) + } + + serverOpts = append(serverOpts, WithArtifactStore(wds)) + } + + // Create the Archivista server with all options + server, err := New(a.Cfg, serverOpts...) + if err != nil { + logrus.Fatalf("could not create archivista server: %+v", err) + } + + // Ensure background processes are managed + go func() { + <-a.sqlStoreCh + <-a.fileStoreCh + }() + + logrus.WithField("duration", time.Since(now)).Infof("completed phase: initializing storage clients") + + return &server, nil +} + +// GetFileStoreCh returns the file store channel +func (a *ArchivistaService) GetFileStoreCh() <-chan error { + return a.fileStoreCh +} + +// GetSQLStoreCh returns the SQL store channel +func (a *ArchivistaService) GetSQLStoreCh() <-chan error { + return a.sqlStoreCh +} + +func (a *ArchivistaService) initObjectStore() (StorerGetter, <-chan error, error) { + switch strings.ToUpper(a.Cfg.StorageBackend) { + case "FILE": + return filestore.New(a.Ctx, a.Cfg.FileDir, a.Cfg.FileServeOn) + + case "BLOB": + var creds *credentials.Credentials + if a.Cfg.BlobStoreCredentialType == "IAM" { + creds = credentials.NewIAM("") + } else if a.Cfg.BlobStoreCredentialType == "ACCESS_KEY" { + creds = credentials.NewStaticV4(a.Cfg.BlobStoreAccessKeyId, a.Cfg.BlobStoreSecretAccessKeyId, "") + } else { + logrus.Fatalf("invalid blob store credential type: %s", a.Cfg.BlobStoreCredentialType) + } + return blobstore.New( + a.Ctx, + a.Cfg.BlobStoreEndpoint, + creds, + a.Cfg.BlobStoreBucketName, + a.Cfg.BlobStoreUseTLS, + ) + + case "": + errCh := make(chan error) + go func() { + <-a.Ctx.Done() + close(errCh) + }() + return nil, errCh, nil + + default: + return nil, nil, fmt.Errorf("unknown storage backend: %s", a.Cfg.StorageBackend) + } +}