From 4be9b0d04fd99e87f265469af22472f52c755b1c Mon Sep 17 00:00:00 2001 From: nouseforaname <34882943+nouseforaname@users.noreply.github.com> Date: Thu, 12 Sep 2024 10:07:16 +0200 Subject: [PATCH 1/5] write lockfiles for in flight services we need a mechanism for the bosh drain lifecycle to know if there are terraform processes in flight. the lockfiles do just that. they filesystem approach was favoured over a in memory map to allow an easier interface with bosh. assuming the csb process crashes while shutting down, recovering the SIs that were in flight is tricky because the in memory state got lost when the crash happened. additionally having the files use the GUIDs for their names allows us to log the ecact SIs that were not succesfully finished via the drain script. --- brokerapi/broker/brokerfakes/fake_storage.go | 148 ++++++++++++++++++ internal/storage/storage.go | 24 ++- internal/storage/terraform_deployment.go | 28 ++++ internal/storage/terraform_deployment_test.go | 46 +++++- .../fake_service_provider_storage.go | 148 ++++++++++++++++++ pkg/broker/service_provider.go | 2 + pkg/providers/tf/deployment_manager.go | 14 +- pkg/providers/tf/provider.go | 5 +- 8 files changed, 405 insertions(+), 10 deletions(-) diff --git a/brokerapi/broker/brokerfakes/fake_storage.go b/brokerapi/broker/brokerfakes/fake_storage.go index f0de811b3..cd6ce9688 100644 --- a/brokerapi/broker/brokerfakes/fake_storage.go +++ b/brokerapi/broker/brokerfakes/fake_storage.go @@ -197,6 +197,17 @@ type FakeStorage struct { result1 storage.TerraformDeployment result2 error } + RemoveLockFileStub func(string) error + removeLockFileMutex sync.RWMutex + removeLockFileArgsForCall []struct { + arg1 string + } + removeLockFileReturns struct { + result1 error + } + removeLockFileReturnsOnCall map[int]struct { + result1 error + } StoreBindRequestDetailsStub func(storage.BindRequestDetails) error storeBindRequestDetailsMutex sync.RWMutex storeBindRequestDetailsArgsForCall []struct { @@ -242,6 +253,17 @@ type FakeStorage struct { storeTerraformDeploymentReturnsOnCall map[int]struct { result1 error } + WriteLockFileStub func(string) error + writeLockFileMutex sync.RWMutex + writeLockFileArgsForCall []struct { + arg1 string + } + writeLockFileReturns struct { + result1 error + } + writeLockFileReturnsOnCall map[int]struct { + result1 error + } invocations map[string][][]interface{} invocationsMutex sync.RWMutex } @@ -1193,6 +1215,67 @@ func (fake *FakeStorage) GetTerraformDeploymentReturnsOnCall(i int, result1 stor }{result1, result2} } +func (fake *FakeStorage) RemoveLockFile(arg1 string) error { + fake.removeLockFileMutex.Lock() + ret, specificReturn := fake.removeLockFileReturnsOnCall[len(fake.removeLockFileArgsForCall)] + fake.removeLockFileArgsForCall = append(fake.removeLockFileArgsForCall, struct { + arg1 string + }{arg1}) + stub := fake.RemoveLockFileStub + fakeReturns := fake.removeLockFileReturns + fake.recordInvocation("RemoveLockFile", []interface{}{arg1}) + fake.removeLockFileMutex.Unlock() + if stub != nil { + return stub(arg1) + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeStorage) RemoveLockFileCallCount() int { + fake.removeLockFileMutex.RLock() + defer fake.removeLockFileMutex.RUnlock() + return len(fake.removeLockFileArgsForCall) +} + +func (fake *FakeStorage) RemoveLockFileCalls(stub func(string) error) { + fake.removeLockFileMutex.Lock() + defer fake.removeLockFileMutex.Unlock() + fake.RemoveLockFileStub = stub +} + +func (fake *FakeStorage) RemoveLockFileArgsForCall(i int) string { + fake.removeLockFileMutex.RLock() + defer fake.removeLockFileMutex.RUnlock() + argsForCall := fake.removeLockFileArgsForCall[i] + return argsForCall.arg1 +} + +func (fake *FakeStorage) RemoveLockFileReturns(result1 error) { + fake.removeLockFileMutex.Lock() + defer fake.removeLockFileMutex.Unlock() + fake.RemoveLockFileStub = nil + fake.removeLockFileReturns = struct { + result1 error + }{result1} +} + +func (fake *FakeStorage) RemoveLockFileReturnsOnCall(i int, result1 error) { + fake.removeLockFileMutex.Lock() + defer fake.removeLockFileMutex.Unlock() + fake.RemoveLockFileStub = nil + if fake.removeLockFileReturnsOnCall == nil { + fake.removeLockFileReturnsOnCall = make(map[int]struct { + result1 error + }) + } + fake.removeLockFileReturnsOnCall[i] = struct { + result1 error + }{result1} +} + func (fake *FakeStorage) StoreBindRequestDetails(arg1 storage.BindRequestDetails) error { fake.storeBindRequestDetailsMutex.Lock() ret, specificReturn := fake.storeBindRequestDetailsReturnsOnCall[len(fake.storeBindRequestDetailsArgsForCall)] @@ -1438,6 +1521,67 @@ func (fake *FakeStorage) StoreTerraformDeploymentReturnsOnCall(i int, result1 er }{result1} } +func (fake *FakeStorage) WriteLockFile(arg1 string) error { + fake.writeLockFileMutex.Lock() + ret, specificReturn := fake.writeLockFileReturnsOnCall[len(fake.writeLockFileArgsForCall)] + fake.writeLockFileArgsForCall = append(fake.writeLockFileArgsForCall, struct { + arg1 string + }{arg1}) + stub := fake.WriteLockFileStub + fakeReturns := fake.writeLockFileReturns + fake.recordInvocation("WriteLockFile", []interface{}{arg1}) + fake.writeLockFileMutex.Unlock() + if stub != nil { + return stub(arg1) + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeStorage) WriteLockFileCallCount() int { + fake.writeLockFileMutex.RLock() + defer fake.writeLockFileMutex.RUnlock() + return len(fake.writeLockFileArgsForCall) +} + +func (fake *FakeStorage) WriteLockFileCalls(stub func(string) error) { + fake.writeLockFileMutex.Lock() + defer fake.writeLockFileMutex.Unlock() + fake.WriteLockFileStub = stub +} + +func (fake *FakeStorage) WriteLockFileArgsForCall(i int) string { + fake.writeLockFileMutex.RLock() + defer fake.writeLockFileMutex.RUnlock() + argsForCall := fake.writeLockFileArgsForCall[i] + return argsForCall.arg1 +} + +func (fake *FakeStorage) WriteLockFileReturns(result1 error) { + fake.writeLockFileMutex.Lock() + defer fake.writeLockFileMutex.Unlock() + fake.WriteLockFileStub = nil + fake.writeLockFileReturns = struct { + result1 error + }{result1} +} + +func (fake *FakeStorage) WriteLockFileReturnsOnCall(i int, result1 error) { + fake.writeLockFileMutex.Lock() + defer fake.writeLockFileMutex.Unlock() + fake.WriteLockFileStub = nil + if fake.writeLockFileReturnsOnCall == nil { + fake.writeLockFileReturnsOnCall = make(map[int]struct { + result1 error + }) + } + fake.writeLockFileReturnsOnCall[i] = struct { + result1 error + }{result1} +} + func (fake *FakeStorage) Invocations() map[string][][]interface{} { fake.invocationsMutex.RLock() defer fake.invocationsMutex.RUnlock() @@ -1471,6 +1615,8 @@ func (fake *FakeStorage) Invocations() map[string][][]interface{} { defer fake.getServiceInstanceDetailsMutex.RUnlock() fake.getTerraformDeploymentMutex.RLock() defer fake.getTerraformDeploymentMutex.RUnlock() + fake.removeLockFileMutex.RLock() + defer fake.removeLockFileMutex.RUnlock() fake.storeBindRequestDetailsMutex.RLock() defer fake.storeBindRequestDetailsMutex.RUnlock() fake.storeProvisionRequestDetailsMutex.RLock() @@ -1479,6 +1625,8 @@ func (fake *FakeStorage) Invocations() map[string][][]interface{} { defer fake.storeServiceInstanceDetailsMutex.RUnlock() fake.storeTerraformDeploymentMutex.RLock() defer fake.storeTerraformDeploymentMutex.RUnlock() + fake.writeLockFileMutex.RLock() + defer fake.writeLockFileMutex.RUnlock() copiedInvocations := map[string][][]interface{}{} for key, value := range fake.invocations { copiedInvocations[key] = value diff --git a/internal/storage/storage.go b/internal/storage/storage.go index 96a992208..335c90a18 100644 --- a/internal/storage/storage.go +++ b/internal/storage/storage.go @@ -1,17 +1,31 @@ // Package storage implements a Database Access Object (DAO) package storage -import "gorm.io/gorm" +import ( + "os" + + "gorm.io/gorm" +) type Storage struct { - db *gorm.DB - encryptor Encryptor + db *gorm.DB + encryptor Encryptor + lockFileDir string } func New(db *gorm.DB, encryptor Encryptor) *Storage { + // the VM based HA deployment requires a drain mechanism. LockFiles are a simple solution. + // but not every environment will opt for using VM based deployments. So detect if the lockfile + // director is present. + + dirDefault := os.Getenv("CSB_LOCKFILE_DIR") + if _, err := os.Stat(dirDefault); err != nil { + dirDefault, _ = os.MkdirTemp("/tmp/", "lockfiles") + } return &Storage{ - db: db, - encryptor: encryptor, + db: db, + encryptor: encryptor, + lockFileDir: dirDefault, } } diff --git a/internal/storage/terraform_deployment.go b/internal/storage/terraform_deployment.go index 9d6e022aa..223793293 100644 --- a/internal/storage/terraform_deployment.go +++ b/internal/storage/terraform_deployment.go @@ -2,6 +2,8 @@ package storage import ( "fmt" + "os" + "strings" "time" "github.com/hashicorp/go-version" @@ -156,3 +158,29 @@ func (s *Storage) loadTerraformDeploymentIfExists(id string, receiver any) error return s.db.Where("id = ?", id).First(receiver).Error } + +func (s *Storage) LockFilesExist() bool { + entries, _ := os.ReadDir(s.lockFileDir) + return len(entries) != 0 +} + +func (s *Storage) WriteLockFile(deploymentID string) error { + return os.WriteFile(fmt.Sprintf("%s/%s", s.lockFileDir, sanitizeFileName(deploymentID)), []byte{}, 0o644) +} + +func (s *Storage) RemoveLockFile(deploymentID string) error { + return os.Remove(fmt.Sprintf("%s/%s", s.lockFileDir, sanitizeFileName(deploymentID))) +} + +func (s *Storage) GetLockedDeploymentIds() ([]string, error) { + entries, err := os.ReadDir(s.lockFileDir) + var names []string + for _, entry := range entries { + names = append(names, strings.ReplaceAll(entry.Name(), "_", ":")) + } + return names, err +} + +func sanitizeFileName(name string) string { + return strings.ReplaceAll(name, ":", "_") +} diff --git a/internal/storage/terraform_deployment_test.go b/internal/storage/terraform_deployment_test.go index 1dfe0ada9..86bd3a8fc 100644 --- a/internal/storage/terraform_deployment_test.go +++ b/internal/storage/terraform_deployment_test.go @@ -206,10 +206,54 @@ var _ = Describe("TerraformDeployments", func() { Expect(store.DeleteTerraformDeployment("not-there")).NotTo(HaveOccurred()) }) }) + + Describe("LockFileExists", func() { + It("reports correct status", func() { + Expect(store.WriteLockFile("1234")).To(Succeed()) + Expect(store.WriteLockFile("5678")).To(Succeed()) + + Expect(store.LockFilesExist()).To(BeTrue()) + + Expect(store.RemoveLockFile("1234")).To(Succeed()) + + Expect(store.LockFilesExist()).To(BeTrue()) + + Expect(store.RemoveLockFile("5678")).To(Succeed()) + + Expect(store.LockFilesExist()).To(BeFalse()) + }) + }) + + Describe("GetLockedDeploymentIds", func() { + It("returns correct names", func() { + names, err := store.GetLockedDeploymentIds() + Expect(err).NotTo(HaveOccurred()) + Expect(names).To(BeEmpty()) + + Expect(store.WriteLockFile("tf:1234:")).To(Succeed()) + Expect(store.WriteLockFile("tf:5678:9123")).To(Succeed()) + + names, err = store.GetLockedDeploymentIds() + Expect(err).NotTo(HaveOccurred()) + Expect(names).To(ContainElements("tf:1234:", "tf:5678:9123")) + + Expect(store.RemoveLockFile("tf:1234:")).To(Succeed()) + + names, err = store.GetLockedDeploymentIds() + Expect(err).NotTo(HaveOccurred()) + Expect(names).To(ContainElements("tf:5678:9123")) + Expect(names).ToNot(ContainElements("tf:1234:")) + + Expect(store.RemoveLockFile("tf:5678:9123")).To(Succeed()) + + names, err = store.GetLockedDeploymentIds() + Expect(err).NotTo(HaveOccurred()) + Expect(names).To(BeEmpty()) + }) + }) }) func addFakeTerraformDeployments() { - Expect(db.Create(&models.TerraformDeployment{ ID: "fake-id-1", Workspace: fakeWorkspace("fake-1", "1.2.3"), diff --git a/pkg/broker/brokerfakes/fake_service_provider_storage.go b/pkg/broker/brokerfakes/fake_service_provider_storage.go index 726eb81e4..c194ea7dd 100644 --- a/pkg/broker/brokerfakes/fake_service_provider_storage.go +++ b/pkg/broker/brokerfakes/fake_service_provider_storage.go @@ -59,6 +59,17 @@ type FakeServiceProviderStorage struct { result1 storage.TerraformDeployment result2 error } + RemoveLockFileStub func(string) error + removeLockFileMutex sync.RWMutex + removeLockFileArgsForCall []struct { + arg1 string + } + removeLockFileReturns struct { + result1 error + } + removeLockFileReturnsOnCall map[int]struct { + result1 error + } StoreTerraformDeploymentStub func(storage.TerraformDeployment) error storeTerraformDeploymentMutex sync.RWMutex storeTerraformDeploymentArgsForCall []struct { @@ -70,6 +81,17 @@ type FakeServiceProviderStorage struct { storeTerraformDeploymentReturnsOnCall map[int]struct { result1 error } + WriteLockFileStub func(string) error + writeLockFileMutex sync.RWMutex + writeLockFileArgsForCall []struct { + arg1 string + } + writeLockFileReturns struct { + result1 error + } + writeLockFileReturnsOnCall map[int]struct { + result1 error + } invocations map[string][][]interface{} invocationsMutex sync.RWMutex } @@ -327,6 +349,67 @@ func (fake *FakeServiceProviderStorage) GetTerraformDeploymentReturnsOnCall(i in }{result1, result2} } +func (fake *FakeServiceProviderStorage) RemoveLockFile(arg1 string) error { + fake.removeLockFileMutex.Lock() + ret, specificReturn := fake.removeLockFileReturnsOnCall[len(fake.removeLockFileArgsForCall)] + fake.removeLockFileArgsForCall = append(fake.removeLockFileArgsForCall, struct { + arg1 string + }{arg1}) + stub := fake.RemoveLockFileStub + fakeReturns := fake.removeLockFileReturns + fake.recordInvocation("RemoveLockFile", []interface{}{arg1}) + fake.removeLockFileMutex.Unlock() + if stub != nil { + return stub(arg1) + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeServiceProviderStorage) RemoveLockFileCallCount() int { + fake.removeLockFileMutex.RLock() + defer fake.removeLockFileMutex.RUnlock() + return len(fake.removeLockFileArgsForCall) +} + +func (fake *FakeServiceProviderStorage) RemoveLockFileCalls(stub func(string) error) { + fake.removeLockFileMutex.Lock() + defer fake.removeLockFileMutex.Unlock() + fake.RemoveLockFileStub = stub +} + +func (fake *FakeServiceProviderStorage) RemoveLockFileArgsForCall(i int) string { + fake.removeLockFileMutex.RLock() + defer fake.removeLockFileMutex.RUnlock() + argsForCall := fake.removeLockFileArgsForCall[i] + return argsForCall.arg1 +} + +func (fake *FakeServiceProviderStorage) RemoveLockFileReturns(result1 error) { + fake.removeLockFileMutex.Lock() + defer fake.removeLockFileMutex.Unlock() + fake.RemoveLockFileStub = nil + fake.removeLockFileReturns = struct { + result1 error + }{result1} +} + +func (fake *FakeServiceProviderStorage) RemoveLockFileReturnsOnCall(i int, result1 error) { + fake.removeLockFileMutex.Lock() + defer fake.removeLockFileMutex.Unlock() + fake.RemoveLockFileStub = nil + if fake.removeLockFileReturnsOnCall == nil { + fake.removeLockFileReturnsOnCall = make(map[int]struct { + result1 error + }) + } + fake.removeLockFileReturnsOnCall[i] = struct { + result1 error + }{result1} +} + func (fake *FakeServiceProviderStorage) StoreTerraformDeployment(arg1 storage.TerraformDeployment) error { fake.storeTerraformDeploymentMutex.Lock() ret, specificReturn := fake.storeTerraformDeploymentReturnsOnCall[len(fake.storeTerraformDeploymentArgsForCall)] @@ -388,6 +471,67 @@ func (fake *FakeServiceProviderStorage) StoreTerraformDeploymentReturnsOnCall(i }{result1} } +func (fake *FakeServiceProviderStorage) WriteLockFile(arg1 string) error { + fake.writeLockFileMutex.Lock() + ret, specificReturn := fake.writeLockFileReturnsOnCall[len(fake.writeLockFileArgsForCall)] + fake.writeLockFileArgsForCall = append(fake.writeLockFileArgsForCall, struct { + arg1 string + }{arg1}) + stub := fake.WriteLockFileStub + fakeReturns := fake.writeLockFileReturns + fake.recordInvocation("WriteLockFile", []interface{}{arg1}) + fake.writeLockFileMutex.Unlock() + if stub != nil { + return stub(arg1) + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeServiceProviderStorage) WriteLockFileCallCount() int { + fake.writeLockFileMutex.RLock() + defer fake.writeLockFileMutex.RUnlock() + return len(fake.writeLockFileArgsForCall) +} + +func (fake *FakeServiceProviderStorage) WriteLockFileCalls(stub func(string) error) { + fake.writeLockFileMutex.Lock() + defer fake.writeLockFileMutex.Unlock() + fake.WriteLockFileStub = stub +} + +func (fake *FakeServiceProviderStorage) WriteLockFileArgsForCall(i int) string { + fake.writeLockFileMutex.RLock() + defer fake.writeLockFileMutex.RUnlock() + argsForCall := fake.writeLockFileArgsForCall[i] + return argsForCall.arg1 +} + +func (fake *FakeServiceProviderStorage) WriteLockFileReturns(result1 error) { + fake.writeLockFileMutex.Lock() + defer fake.writeLockFileMutex.Unlock() + fake.WriteLockFileStub = nil + fake.writeLockFileReturns = struct { + result1 error + }{result1} +} + +func (fake *FakeServiceProviderStorage) WriteLockFileReturnsOnCall(i int, result1 error) { + fake.writeLockFileMutex.Lock() + defer fake.writeLockFileMutex.Unlock() + fake.WriteLockFileStub = nil + if fake.writeLockFileReturnsOnCall == nil { + fake.writeLockFileReturnsOnCall = make(map[int]struct { + result1 error + }) + } + fake.writeLockFileReturnsOnCall[i] = struct { + result1 error + }{result1} +} + func (fake *FakeServiceProviderStorage) Invocations() map[string][][]interface{} { fake.invocationsMutex.RLock() defer fake.invocationsMutex.RUnlock() @@ -399,8 +543,12 @@ func (fake *FakeServiceProviderStorage) Invocations() map[string][][]interface{} defer fake.getServiceBindingIDsForServiceInstanceMutex.RUnlock() fake.getTerraformDeploymentMutex.RLock() defer fake.getTerraformDeploymentMutex.RUnlock() + fake.removeLockFileMutex.RLock() + defer fake.removeLockFileMutex.RUnlock() fake.storeTerraformDeploymentMutex.RLock() defer fake.storeTerraformDeploymentMutex.RUnlock() + fake.writeLockFileMutex.RLock() + defer fake.writeLockFileMutex.RUnlock() copiedInvocations := map[string][][]interface{}{} for key, value := range fake.invocations { copiedInvocations[key] = value diff --git a/pkg/broker/service_provider.go b/pkg/broker/service_provider.go index 1cd6ad535..f28cc0758 100644 --- a/pkg/broker/service_provider.go +++ b/pkg/broker/service_provider.go @@ -77,4 +77,6 @@ type ServiceProviderStorage interface { DeleteTerraformDeployment(id string) error ExistsTerraformDeployment(id string) (bool, error) GetServiceBindingIDsForServiceInstance(serviceInstanceID string) ([]string, error) + WriteLockFile(guid string) error + RemoveLockFile(guid string) error } diff --git a/pkg/providers/tf/deployment_manager.go b/pkg/providers/tf/deployment_manager.go index 188581ec5..3a3f6c19b 100644 --- a/pkg/providers/tf/deployment_manager.go +++ b/pkg/providers/tf/deployment_manager.go @@ -51,7 +51,7 @@ func (d *DeploymentManager) MarkOperationStarted(deployment *storage.TerraformDe return err } - return nil + return d.store.WriteLockFile(deployment.ID) } func (d *DeploymentManager) MarkOperationFinished(deployment *storage.TerraformDeployment, err error) error { @@ -74,8 +74,16 @@ func (d *DeploymentManager) MarkOperationFinished(deployment *storage.TerraformD }) } - - return d.store.StoreTerraformDeployment(*deployment) + err = d.store.StoreTerraformDeployment(*deployment) + if err != nil { + d.logger.Error("store-state", err, lager.Data{ + "deploymentID": deployment.ID, + "message": deployment.LastOperationMessage, + }) + } else { + d.logger.Info(fmt.Sprintf("successfully stored state for %s", deployment.ID)) + } + return d.store.RemoveLockFile(deployment.ID) } func (d *DeploymentManager) OperationStatus(deploymentID string) (bool, string, error) { diff --git a/pkg/providers/tf/provider.go b/pkg/providers/tf/provider.go index 6c9828995..6e2eaeae7 100644 --- a/pkg/providers/tf/provider.go +++ b/pkg/providers/tf/provider.go @@ -96,7 +96,10 @@ func (provider *TerraformProvider) create(ctx context.Context, vars *varcontext. } else { err = provider.DefaultInvoker().Apply(ctx, newWorkspace) } - _ = provider.MarkOperationFinished(&deployment, err) + err = provider.MarkOperationFinished(&deployment, err) + if err != nil { + provider.logger.Error("MarkOperationFinished", err) + } }() return tfID, nil From 113605a782dacc2794ff5d6764e0e6fc436f06f3 Mon Sep 17 00:00:00 2001 From: nouseforaname <34882943+nouseforaname@users.noreply.github.com> Date: Thu, 12 Sep 2024 10:19:18 +0200 Subject: [PATCH 2/5] add graceful shutdown we want to enable graceful shutdowns for the csb process so we can ensure in flight terraform is able to finish before the broker process exits. this hopes to make the broker more resilient against leaving orphaned resources in the underlying IaaS. --- cmd/serve.go | 72 +++++++++++++++++++++------- integrationtest/import_state_test.go | 2 +- internal/testdrive/broker.go | 11 ++++- internal/testdrive/broker_start.go | 2 +- internal/testdrive/runner.go | 22 ++++++++- 5 files changed, 87 insertions(+), 22 deletions(-) diff --git a/cmd/serve.go b/cmd/serve.go index b69135ccb..4ef69f50c 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -22,7 +22,11 @@ import ( "io" "log/slog" "net/http" + "os" + "os/signal" "strings" + "syscall" + "time" "code.cloudfoundry.org/lager/v3" osbapiBroker "github.com/cloudfoundry/cloud-service-broker/v2/brokerapi/broker" @@ -55,6 +59,8 @@ const ( tlsKeyProp = "api.tlsKey" encryptionPasswords = "db.encryption.passwords" encryptionEnabled = "db.encryption.enabled" + + shutdownTimeout = time.Hour ) var cfCompatibilityToggle = toggles.Features.Toggle("enable-cf-sharing", false, `Set all services to have the Sharable flag so they can be shared @@ -133,7 +139,10 @@ func serve() { if err != nil { logger.Error("failed to get database connection", err) } - startServer(cfg.Registry, sqldb, brokerAPI, storage.New(db, encryptor), credentials) + csbStore := storage.New(db, encryptor) + httpServer := startServer(cfg.Registry, sqldb, brokerAPI, csbStore, credentials) + + listenForShutdownSignal(httpServer, logger, csbStore) } func serveDocs() { @@ -188,7 +197,7 @@ func setupDBEncryption(db *gorm.DB, logger lager.Logger) storage.Encryptor { return config.Encryptor } -func startServer(registry pakBroker.BrokerRegistry, db *sql.DB, brokerapi http.Handler, store *storage.Storage, credentials brokerapi.BrokerCredentials) { +func startServer(registry pakBroker.BrokerRegistry, db *sql.DB, brokerapi http.Handler, store *storage.Storage, credentials brokerapi.BrokerCredentials) *http.Server { logger := utils.NewLogger("cloud-service-broker") docsHandler := server.DocsHandler(registry) @@ -225,24 +234,44 @@ func startServer(registry pakBroker.BrokerRegistry, db *sql.DB, brokerapi http.H Addr: fmt.Sprintf("%s:%s", host, port), Handler: router, } - var err error - if tlsCertCaBundleFilePath != "" && tlsKeyFilePath != "" { - err = httpServer.ListenAndServeTLS(tlsCertCaBundleFilePath, tlsKeyFilePath) - } else { - err = httpServer.ListenAndServe() - } - // when the server is receiving a signal, we probably do not want to panic. - if err != http.ErrServerClosed { - logger.Fatal("Failed to start broker", err) - } + go func() { + var err error + if tlsCertCaBundleFilePath != "" && tlsKeyFilePath != "" { + err = httpServer.ListenAndServeTLS(tlsCertCaBundleFilePath, tlsKeyFilePath) + } else { + err = httpServer.ListenAndServe() + } + if err == http.ErrServerClosed { + logger.Info("shutting down csb") + } else { + logger.Fatal("Failed to start broker", err) + } + }() + return httpServer } -func labelName(label string) string { - switch label { - case "": - return "none" +func listenForShutdownSignal(httpServer *http.Server, logger lager.Logger, store *storage.Storage) { + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGTERM) + + signalReceived := <-sigChan + + switch signalReceived { + + case syscall.SIGTERM: + shutdownCtx, shutdownRelease := context.WithTimeout(context.Background(), shutdownTimeout) + if err := httpServer.Shutdown(shutdownCtx); err != nil { + logger.Fatal("shutdown error: %v", err) + } + logger.Info("received SIGTERM, server is shutting down gracefully allowing for in flight work to finish") + defer shutdownRelease() + for store.LockFilesExist() { + logger.Info("draining csb in progress") + time.Sleep(time.Second * 1) + } + logger.Info("draining complete") default: - return label + logger.Info(fmt.Sprintf("csb does not handle the %s interrupt signal", signalReceived)) } } @@ -288,3 +317,12 @@ func importStateHandler(store *storage.Storage) http.Handler { } }) } + +func labelName(label string) string { + switch label { + case "": + return "none" + default: + return label + } +} diff --git a/integrationtest/import_state_test.go b/integrationtest/import_state_test.go index f7e453c4c..36ab237d7 100644 --- a/integrationtest/import_state_test.go +++ b/integrationtest/import_state_test.go @@ -31,7 +31,7 @@ var _ = Describe("Import State", func() { broker = must(testdrive.StartBroker(csb, brokerpak, database)) DeferCleanup(func() { - Expect(broker.Stop()).To(Succeed()) + Expect(broker.Terminate()).To(Succeed()) cleanup(brokerpak) }) }) diff --git a/internal/testdrive/broker.go b/internal/testdrive/broker.go index 416d70deb..f2259483b 100644 --- a/internal/testdrive/broker.go +++ b/internal/testdrive/broker.go @@ -23,6 +23,15 @@ func (b *Broker) Stop() error { case b == nil, b.runner == nil: return nil default: - return b.runner.stop() + return b.runner.gracefullStop() + } +} + +func (b *Broker) Terminate() error { + switch { + case b == nil, b.runner == nil: + return nil + default: + return b.runner.forceStop() } } diff --git a/internal/testdrive/broker_start.go b/internal/testdrive/broker_start.go index 7831b189f..d71986553 100644 --- a/internal/testdrive/broker_start.go +++ b/internal/testdrive/broker_start.go @@ -115,7 +115,7 @@ func StartBroker(csbPath, bpk, db string, opts ...StartBrokerOption) (*Broker, e case err == nil && response.StatusCode == http.StatusOK: return &broker, nil case time.Since(start) > time.Minute: - if err := broker.runner.stop(); err != nil { + if err := broker.runner.forceStop(); err != nil { return nil, err } return nil, fmt.Errorf("timed out after %s waiting for broker to start: %s\n%s", time.Since(start), stdout.String(), stderr.String()) diff --git a/internal/testdrive/runner.go b/internal/testdrive/runner.go index 1d4736e3b..68aa89481 100644 --- a/internal/testdrive/runner.go +++ b/internal/testdrive/runner.go @@ -2,6 +2,7 @@ package testdrive import ( "os/exec" + "syscall" "time" ) @@ -26,12 +27,29 @@ func (r *runner) error(err error) *runner { return r } -func (r *runner) stop() error { +func (r *runner) gracefullStop() error { if r.exited { return nil } if r.cmd != nil && r.cmd.Process != nil { - if err := r.cmd.Process.Kill(); err != nil { + if err := r.cmd.Process.Signal(syscall.SIGTERM); err != nil { + return err + } + } + + for !r.exited { + time.Sleep(time.Millisecond) + } + + return nil +} + +func (r *runner) forceStop() error { + if r.exited { + return nil + } + if r.cmd != nil && r.cmd.Process != nil { + if err := r.cmd.Process.Signal(syscall.SIGKILL); err != nil { return err } } From 46824dba3f6c97e0c34884f1bf8c26aef033bb13 Mon Sep 17 00:00:00 2001 From: nouseforaname <34882943+nouseforaname@users.noreply.github.com> Date: Thu, 12 Sep 2024 10:21:09 +0200 Subject: [PATCH 3/5] move recover in progress to storage package --- cmd/serve.go | 9 +- dbservice/dbservice.go | 3 - dbservice/recover_in_progress_operations.go | 24 ----- .../recover_in_progress_operations_test.go | 60 ------------ .../storage/recover_in_progress_operations.go | 56 +++++++++++ .../recover_in_progress_operations_test.go | 98 +++++++++++++++++++ 6 files changed, 161 insertions(+), 89 deletions(-) delete mode 100644 dbservice/recover_in_progress_operations.go delete mode 100644 dbservice/recover_in_progress_operations_test.go create mode 100644 internal/storage/recover_in_progress_operations.go create mode 100644 internal/storage/recover_in_progress_operations_test.go diff --git a/cmd/serve.go b/cmd/serve.go index 4ef69f50c..6d07274c0 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -108,7 +108,13 @@ func serve() { logger.Fatal("Error initializing service broker config", err) } var serviceBroker domain.ServiceBroker - serviceBroker, err = osbapiBroker.New(cfg, storage.New(db, encryptor), logger) + csbStore := storage.New(db, encryptor) + err = csbStore.RecoverInProgressOperations(logger) + if err != nil { + logger.Fatal("Error recovering in-progress operations", err) + } + + serviceBroker, err = osbapiBroker.New(cfg, csbStore, logger) if err != nil { logger.Fatal("Error initializing service broker", err) } @@ -139,7 +145,6 @@ func serve() { if err != nil { logger.Error("failed to get database connection", err) } - csbStore := storage.New(db, encryptor) httpServer := startServer(cfg.Registry, sqldb, brokerAPI, csbStore, credentials) listenForShutdownSignal(httpServer, logger, csbStore) diff --git a/dbservice/dbservice.go b/dbservice/dbservice.go index 10b1a8faa..3aaf70502 100644 --- a/dbservice/dbservice.go +++ b/dbservice/dbservice.go @@ -43,9 +43,6 @@ func NewWithMigrations(logger lager.Logger) *gorm.DB { if err := RunMigrations(db); err != nil { panic(fmt.Sprintf("Error migrating database: %s", err)) } - if err := recoverInProgressOperations(db, logger); err != nil { - panic(fmt.Sprintf("Error recovering in-progress operations: %s", err)) - } }) return db } diff --git a/dbservice/recover_in_progress_operations.go b/dbservice/recover_in_progress_operations.go deleted file mode 100644 index 9b9c1a549..000000000 --- a/dbservice/recover_in_progress_operations.go +++ /dev/null @@ -1,24 +0,0 @@ -package dbservice - -import ( - "code.cloudfoundry.org/lager/v3" - "github.com/cloudfoundry/cloud-service-broker/v2/dbservice/models" - "gorm.io/gorm" -) - -func recoverInProgressOperations(db *gorm.DB, logger lager.Logger) error { - logger = logger.Session("recover-in-progress-operations") - - var terraformDeploymentBatch []models.TerraformDeployment - result := db.Where("last_operation_state = ?", "in progress").FindInBatches(&terraformDeploymentBatch, 100, func(tx *gorm.DB, batchNumber int) error { - for i := range terraformDeploymentBatch { - terraformDeploymentBatch[i].LastOperationState = "failed" - terraformDeploymentBatch[i].LastOperationMessage = "the broker restarted while the operation was in progress" - logger.Info("mark-as-failed", lager.Data{"workspace_id": terraformDeploymentBatch[i].ID}) - } - - return tx.Save(&terraformDeploymentBatch).Error - }) - - return result.Error -} diff --git a/dbservice/recover_in_progress_operations_test.go b/dbservice/recover_in_progress_operations_test.go deleted file mode 100644 index c757e94ea..000000000 --- a/dbservice/recover_in_progress_operations_test.go +++ /dev/null @@ -1,60 +0,0 @@ -package dbservice - -import ( - "code.cloudfoundry.org/lager/v3/lagertest" - "github.com/cloudfoundry/cloud-service-broker/v2/dbservice/models" - . "github.com/onsi/ginkgo/v2" - . "github.com/onsi/gomega" - "gorm.io/driver/sqlite" - "gorm.io/gorm" -) - -var _ = Describe("RecoverInProgressOperations()", func() { - It("recovers the expected operations", func() { - const ( - recoverID = "fake-id-to-recover" - okID = "fake-id-that-does-not-need-to-be-recovered" - ) - - // Setup - db, err := gorm.Open(sqlite.Open(":memory:"), nil) - Expect(err).NotTo(HaveOccurred()) - Expect(db.Migrator().CreateTable(&models.TerraformDeployment{})).To(Succeed()) - - Expect(db.Create(&models.TerraformDeployment{ - ID: recoverID, - LastOperationType: "fake-type", - LastOperationState: "in progress", - LastOperationMessage: "fake-type in progress", - }).Error).To(Succeed()) - Expect(db.Create(&models.TerraformDeployment{ - ID: okID, - LastOperationType: "fake-type", - LastOperationState: "succeeded", - LastOperationMessage: "fake-type succeeded", - }).Error).To(Succeed()) - - // Call the function - logger := lagertest.NewTestLogger("test") - recoverInProgressOperations(db, logger) - - // Behaviors - By("marking the in-progress operation as failed") - var r1 models.TerraformDeployment - Expect(db.Where("id = ?", recoverID).First(&r1).Error).To(Succeed()) - Expect(r1.LastOperationState).To(Equal("failed")) - Expect(r1.LastOperationMessage).To(Equal("the broker restarted while the operation was in progress")) - - By("no updating other operations") - var r2 models.TerraformDeployment - Expect(db.Where("id = ?", okID).First(&r2).Error).To(Succeed()) - Expect(r2.LastOperationState).To(Equal("succeeded")) - Expect(r2.LastOperationMessage).To(Equal("fake-type succeeded")) - - By("logging the expected message") - Expect(logger.Buffer().Contents()).To(SatisfyAll( - ContainSubstring(`"message":"test.recover-in-progress-operations.mark-as-failed"`), - ContainSubstring(`"workspace_id":"fake-id-to-recover"`), - )) - }) -}) diff --git a/internal/storage/recover_in_progress_operations.go b/internal/storage/recover_in_progress_operations.go new file mode 100644 index 000000000..ae8f48c9c --- /dev/null +++ b/internal/storage/recover_in_progress_operations.go @@ -0,0 +1,56 @@ +package storage + +import ( + "os" + + "code.cloudfoundry.org/lager/v3" + "github.com/cloudfoundry/cloud-service-broker/v2/dbservice/models" + "gorm.io/gorm" +) + +const FailedMessage = "the broker restarted while the operation was in progress" + +func (s *Storage) RecoverInProgressOperations(logger lager.Logger) error { + logger = logger.Session("recover-in-progress-operations") + + // We only wan't to fail interrupted service instances if we detect that we run as a CF APP. + // VM based csb instances implement a drain mechanism and should need this. Additionally VM + // based csb deployments are scalable horizontally and the below would fail in flight instances + // of another csb process. + if os.Getenv("CF_INSTANCE_GUID") != "" { + var terraformDeploymentBatch []models.TerraformDeployment + result := s.db.Where("last_operation_state = ?", "in progress").FindInBatches(&terraformDeploymentBatch, 100, func(tx *gorm.DB, batchNumber int) error { + for i := range terraformDeploymentBatch { + terraformDeploymentBatch[i].LastOperationState = "failed" + terraformDeploymentBatch[i].LastOperationMessage = FailedMessage + logger.Info("mark-as-failed", lager.Data{"workspace_id": terraformDeploymentBatch[i].ID}) + } + + return tx.Save(&terraformDeploymentBatch).Error + }) + + return result.Error + } else { + deploymentIds, err := s.GetLockedDeploymentIds() + if err != nil { + return err + } + + for _, id := range deploymentIds { + var receiver models.TerraformDeployment + if err := s.db.Where("id = ?", id).First(&receiver).Error; err != nil { + return err + } + receiver.LastOperationState = "failed" + receiver.LastOperationMessage = FailedMessage + + err := s.db.Save(receiver).Error + if err != nil { + return err + } + logger.Info("mark-as-failed", lager.Data{"workspace_id": id}) + } + return err + } + +} diff --git a/internal/storage/recover_in_progress_operations_test.go b/internal/storage/recover_in_progress_operations_test.go new file mode 100644 index 000000000..9ad4fa919 --- /dev/null +++ b/internal/storage/recover_in_progress_operations_test.go @@ -0,0 +1,98 @@ +package storage_test + +import ( + "os" + + "code.cloudfoundry.org/lager/v3/lagertest" + "github.com/cloudfoundry/cloud-service-broker/v2/dbservice/models" + "github.com/cloudfoundry/cloud-service-broker/v2/internal/storage" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +const ( + recoverID = "fake-id-to-recover" + okID = "fake-id-that-does-not-need-to-be-recovered" +) + +var _ = Describe("RecoverInProgressOperations()", func() { + BeforeEach(func() { + // Setup + Expect(db.Create(&models.TerraformDeployment{ + ID: recoverID, + LastOperationType: "fake-type", + LastOperationState: "in progress", + LastOperationMessage: "fake-type in progress", + }).Error).To(Succeed()) + Expect(db.Create(&models.TerraformDeployment{ + ID: okID, + LastOperationType: "fake-type", + LastOperationState: "succeeded", + LastOperationMessage: "fake-type succeeded", + }).Error).To(Succeed()) + var rowCount int64 + db.Model(&models.TerraformDeployment{}).Count(&rowCount) + Expect(rowCount).To(BeNumerically("==", 2)) + + logger = lagertest.NewTestLogger("test") + store = storage.New(db, encryptor) + }) + + When("running as a cf app", func() { + It("recovers the expected operations", func() { + os.Setenv("CF_INSTANCE_GUID", "something") // The presence of this variable means we are running as an App + defer os.Unsetenv("CF_INSTANCE_GUID") + + // Call the function + Expect(store.RecoverInProgressOperations(logger)).To(Succeed()) + + // Behaviors + By("marking the in-progress operation as failed") + var r1 models.TerraformDeployment + Expect(db.Where("id = ?", recoverID).First(&r1).Error).To(Succeed()) + Expect(r1.LastOperationState).To(Equal("failed")) + Expect(r1.LastOperationMessage).To(Equal("the broker restarted while the operation was in progress")) + + By("no updating other operations") + var r2 models.TerraformDeployment + Expect(db.Where("id = ?", okID).First(&r2).Error).To(Succeed()) + Expect(r2.LastOperationState).To(Equal("succeeded")) + Expect(r2.LastOperationMessage).To(Equal("fake-type succeeded")) + + By("logging the expected message") + Expect(logger.Buffer().Contents()).To(SatisfyAll( + ContainSubstring(`"message":"test.recover-in-progress-operations.mark-as-failed"`), + ContainSubstring(`"workspace_id":"fake-id-to-recover"`), + )) + }) + }) + + When("running on a VM", func() { + It("recovers the expected operations", func() { + // When running on a VM there will be a lockfile and record in the db + Expect(store.WriteLockFile(recoverID)).To(Succeed()) + + // Call the function + Expect(store.RecoverInProgressOperations(logger)).To(Succeed()) + + // Behaviors + By("marking the in-progress operation as failed") + var r1 models.TerraformDeployment + Expect(db.Where("id = ?", recoverID).First(&r1).Error).To(Succeed()) + Expect(r1.LastOperationState).To(Equal("failed")) + Expect(r1.LastOperationMessage).To(Equal("the broker restarted while the operation was in progress")) + + By("no updating other operations") + var r2 models.TerraformDeployment + Expect(db.Where("id = ?", okID).First(&r2).Error).To(Succeed()) + Expect(r2.LastOperationState).To(Equal("succeeded")) + Expect(r2.LastOperationMessage).To(Equal("fake-type succeeded")) + + By("logging the expected message") + Expect(logger.Buffer().Contents()).To(SatisfyAll( + ContainSubstring(`"message":"test.recover-in-progress-operations.mark-as-failed"`), + ContainSubstring(`"workspace_id":"fake-id-to-recover"`), + )) + }) + }) +}) From fcb7872ed47e80dbbeee9665223c1b04da99130d Mon Sep 17 00:00:00 2001 From: nouseforaname <34882943+nouseforaname@users.noreply.github.com> Date: Thu, 12 Sep 2024 10:27:29 +0200 Subject: [PATCH 4/5] update tests to cover shutdown and updated recover in progress --- .../fake-uuid-provision.tf | 11 +- .../termination-recovery/manifest.yml | 2 + integrationtest/integrationtest_suite_test.go | 16 +- integrationtest/termination_recovery_test.go | 314 +++++++++++------- internal/storage/storage_suite_test.go | 2 + pkg/providers/tf/deployment_manager_test.go | 7 +- 6 files changed, 232 insertions(+), 120 deletions(-) diff --git a/integrationtest/fixtures/termination-recovery/fake-uuid-provision.tf b/integrationtest/fixtures/termination-recovery/fake-uuid-provision.tf index db12d4a33..ba683a089 100644 --- a/integrationtest/fixtures/termination-recovery/fake-uuid-provision.tf +++ b/integrationtest/fixtures/termination-recovery/fake-uuid-provision.tf @@ -3,9 +3,16 @@ terraform { random = { source = "registry.terraform.io/hashicorp/random" } + null = { + source = "registry.terraform.io/hashicorp/null" + } + } +} +resource "null_resource" "sleeper" { + provisioner "local-exec" { + command = "sleep 10" } } - resource "random_uuid" "random" {} -output provision_output { value = random_uuid.random.result } \ No newline at end of file +output provision_output { value = random_uuid.random.result } diff --git a/integrationtest/fixtures/termination-recovery/manifest.yml b/integrationtest/fixtures/termination-recovery/manifest.yml index 306bb39ab..b5758ffe9 100644 --- a/integrationtest/fixtures/termination-recovery/manifest.yml +++ b/integrationtest/fixtures/termination-recovery/manifest.yml @@ -14,5 +14,7 @@ terraform_binaries: source: https://github.com/opentofu/opentofu/archive/refs/tags/v1.6.0.zip - name: terraform-provider-random version: 3.1.0 +- name: terraform-provider-null + version: 3.2.2 service_definitions: - fake-uuid-service.yml diff --git a/integrationtest/integrationtest_suite_test.go b/integrationtest/integrationtest_suite_test.go index cae7ec363..280c9d235 100644 --- a/integrationtest/integrationtest_suite_test.go +++ b/integrationtest/integrationtest_suite_test.go @@ -30,7 +30,7 @@ var ( var _ = SynchronizedBeforeSuite( func() []byte { // -gcflags enabled "gops", but had to be removed as this doesn't compile with Go 1.19 - //path, err := Build("github.com/cloudfoundry/cloud-service-broker", `-gcflags="all=-N -l"`) + // path, err := Build("github.com/cloudfoundry/cloud-service-broker", `-gcflags="all=-N -l"`) path := must(Build("github.com/cloudfoundry/cloud-service-broker/v2")) return []byte(path) }, @@ -45,8 +45,18 @@ var _ = SynchronizedBeforeSuite( ) var _ = SynchronizedAfterSuite( - func() {}, - func() { CleanupBuildArtifacts() }, + func() { + }, + func() { + CleanupBuildArtifacts() + files, err := filepath.Glob("/tmp/brokerpak*") + Expect(err).ToNot(HaveOccurred()) + for _, f := range files { + if err := os.RemoveAll(f); err != nil { + Expect(err).ToNot(HaveOccurred()) + } + } + }, ) var _ = BeforeEach(func() { diff --git a/integrationtest/termination_recovery_test.go b/integrationtest/termination_recovery_test.go index 79c547c70..fa356fc17 100644 --- a/integrationtest/termination_recovery_test.go +++ b/integrationtest/termination_recovery_test.go @@ -3,6 +3,8 @@ package integrationtest_test import ( "fmt" "net/http" + "os" + "time" "github.com/cloudfoundry/cloud-service-broker/v2/integrationtest/packer" "github.com/cloudfoundry/cloud-service-broker/v2/internal/testdrive" @@ -28,145 +30,233 @@ var _ = Describe("Recovery From Broker Termination", func() { BeforeEach(func() { brokerpak = must(packer.BuildBrokerpak(csb, fixtures("termination-recovery"))) - + }) + BeforeEach(func() { stdout = NewBuffer() stderr = NewBuffer() - broker = must(testdrive.StartBroker(csb, brokerpak, database, testdrive.WithOutputs(stdout, stderr))) - - DeferCleanup(func() { - Expect(broker.Stop()).To(Succeed()) - cleanup(brokerpak) + }) + Describe("running csb on a VM", func() { + Describe("when a vm broker properly drains", func() { + BeforeEach(func() { + broker = must(testdrive.StartBroker(csb, brokerpak, database, testdrive.WithOutputs(stdout, stderr))) + + DeferCleanup(func() { + Expect(broker.Terminate()).To(Succeed()) + }) + }) + + It("can finish the in flight operation", func() { + By("starting to provision") + instanceGUID := uuid.NewString() + response := broker.Client.Provision(instanceGUID, serviceOfferingGUID, servicePlanGUID, uuid.NewString(), nil) + + Expect(response.Error).NotTo(HaveOccurred()) + Expect(response.StatusCode).To(Equal(http.StatusAccepted)) + Eventually(stdout, time.Second*5).Should(Say(`tofu","apply","-auto-approve"`)) + By("gracefully stopping the broker") + // Stop seems to be blocking, so run it in a routine so we can check that the broker actually rejects requests until it's fully stopped. + go func() { + Expect(broker.Stop()).To(Succeed()) + }() + + By("logging a message") + Eventually(stdout).Should(Say("received SIGTERM")) + Eventually(stdout).Should(Say("draining csb in progress")) + + By("ensuring that the broker rejects requests") + Expect(broker.Client.LastOperation(instanceGUID, uuid.NewString()).Error).To(HaveOccurred()) + + // Fun stuff, do not optimize this with a SatisfyAll().. The relevant part of the docs is: + // When Say succeeds, it fast forwards the gbytes.Buffer's read cursor to just after the successful match. + // meaning if below lines will be partially matched at first attempt, no further attempt can succeed because we + // forwarded past the location of the initial first match. + + Eventually(stdout, time.Second*20).Should(Say(fmt.Sprintf("successfully stored state for tf:%s:", instanceGUID))) + Eventually(stdout, time.Second*20).Should(Say("draining complete")) + Consistently(stderr, time.Second*20).ShouldNot(Say("shutdown error")) + + broker = must(testdrive.StartBroker(csb, brokerpak, database, testdrive.WithOutputs(stdout, stderr))) + + By("checking that the resource finished successfully") + response = broker.Client.LastOperation(instanceGUID, uuid.NewString()) + Expect(string(response.ResponseBody)).To(ContainSubstring(`{"state":"succeeded","description":"provision succeeded"}`)) + Expect(response.Error).NotTo(HaveOccurred()) + Expect(response.StatusCode).To(Equal(http.StatusOK)) + + By("ensuring SI can be successfully deleted") + si := testdrive.ServiceInstance{GUID: instanceGUID, ServiceOfferingGUID: serviceOfferingGUID, ServicePlanGUID: servicePlanGUID} + Expect(broker.Deprovision(si)).To(Succeed()) + }) + }) + Describe("when a vm broker did not properly drain", func() { + var dirDefault string + BeforeEach(func() { + By("ensuring that the expected lockdir exists") + + dirDefault, _ = os.MkdirTemp("/tmp/", "lockfiles") + broker = must(testdrive.StartBroker(csb, brokerpak, database, testdrive.WithOutputs(stdout, stderr), testdrive.WithEnv(fmt.Sprintf("CSB_LOCKFILE_DIR=%s", dirDefault)))) + }) + + It("fails service instances that have a lockfile on start", func() { + instanceGUID := uuid.NewString() + response := broker.Client.Provision(instanceGUID, serviceOfferingGUID, servicePlanGUID, uuid.NewString(), nil) + Expect(response.Error).NotTo(HaveOccurred()) + Expect(response.StatusCode).To(Equal(http.StatusAccepted)) + + Eventually(stdout, time.Second*5).Should(Say(`tofu","apply","-auto-approve"`)) + By("forcefully stopping the broker") + // Stop seems to be blocking, so run it in a routine so we can check that the broker actually rejects requests until it's fully stopped. + go func() { + Expect(broker.Terminate()).To(Succeed()) + }() + + broker = must(testdrive.StartBroker(csb, brokerpak, database, testdrive.WithOutputs(stdout, stderr), testdrive.WithEnv(fmt.Sprintf("CSB_LOCKFILE_DIR=%s", dirDefault)))) + lastOperation, err := broker.LastOperation(instanceGUID) + Expect(err).NotTo(HaveOccurred()) + Expect(lastOperation.Description).To(Equal("the broker restarted while the operation was in progress")) + Expect(lastOperation.State).To(BeEquivalentTo("failed")) + }) }) }) - It("can recover from a terminated create", func() { - By("starting to provision") - instanceGUID := uuid.NewString() - response := broker.Client.Provision(instanceGUID, serviceOfferingGUID, servicePlanGUID, uuid.NewString(), nil) - Expect(response.Error).NotTo(HaveOccurred()) - Expect(response.StatusCode).To(Equal(http.StatusAccepted)) - - By("terminating and restarting the broker") - Expect(broker.Stop()).To(Succeed()) - broker = must(testdrive.StartBroker(csb, brokerpak, database, testdrive.WithOutputs(stdout, stderr))) - - By("reporting that an operation failed") - lastOperation, err := broker.LastOperation(instanceGUID) - Expect(err).NotTo(HaveOccurred()) - Expect(lastOperation.Description).To(Equal("the broker restarted while the operation was in progress")) - Expect(lastOperation.State).To(BeEquivalentTo("failed")) - - By("logging a message") - ws := fmt.Sprintf(`"workspace_id":"tf:%s:"`, instanceGUID) - Expect(string(stdout.Contents())).To(SatisfyAll(ContainSubstring("recover-in-progress-operations.mark-as-failed"), ContainSubstring(ws))) - - // OSBAPI requires that HTTP 409 (Conflict) is returned - By("refusing to allow a duplicate instance") - response = broker.Client.Provision(instanceGUID, serviceOfferingGUID, servicePlanGUID, uuid.NewString(), nil) - Expect(response.Error).NotTo(HaveOccurred()) - Expect(response.StatusCode).To(Equal(http.StatusConflict)) - - By("allowing the instance to be cleaned up") - response = broker.Client.Deprovision(instanceGUID, serviceOfferingGUID, servicePlanGUID, uuid.NewString()) - Expect(response.Error).NotTo(HaveOccurred()) - Expect(response.StatusCode).To(Equal(http.StatusOK)) - }) + Describe("running csb as a CF app", func() { + BeforeEach(func() { + broker = must(testdrive.StartBroker(csb, brokerpak, database, testdrive.WithOutputs(stdout, stderr), testdrive.WithEnv("CF_INSTANCE_GUID=dcfa061e-c0e3-4237-a805-734578347393"))) + + DeferCleanup(func() { + Expect(broker.Terminate()).To(Succeed()) + }) + }) - It("can recover from a terminated update", func() { - By("successfully provisioning a service instance") - instance, err := broker.Provision(serviceOfferingGUID, servicePlanGUID) - Expect(err).NotTo(HaveOccurred()) + It("can recover from a terminated create", func() { + By("starting to provision") + instanceGUID := uuid.NewString() + response := broker.Client.Provision(instanceGUID, serviceOfferingGUID, servicePlanGUID, uuid.NewString(), nil) + Expect(response.Error).NotTo(HaveOccurred()) + Expect(response.StatusCode).To(Equal(http.StatusAccepted)) + + By("terminating and restarting the broker") + Expect(broker.Terminate()).To(Succeed()) + broker = must(testdrive.StartBroker(csb, brokerpak, database, testdrive.WithOutputs(stdout, stderr), testdrive.WithEnv("CF_INSTANCE_GUID=dcfa061e-c0e3-4237-a805-734578347393"))) + + By("reporting that an operation failed") + lastOperation, err := broker.LastOperation(instanceGUID) + Expect(err).NotTo(HaveOccurred()) + Expect(lastOperation.Description).To(Equal("the broker restarted while the operation was in progress")) + Expect(lastOperation.State).To(BeEquivalentTo("failed")) + + By("logging a message") + ws := fmt.Sprintf(`"workspace_id":"tf:%s:"`, instanceGUID) + Expect(string(stdout.Contents())).To(SatisfyAll(ContainSubstring("recover-in-progress-operations.mark-as-failed"), ContainSubstring(ws))) + + // OSBAPI requires that HTTP 409 (Conflict) is returned + By("refusing to allow a duplicate instance") + response = broker.Client.Provision(instanceGUID, serviceOfferingGUID, servicePlanGUID, uuid.NewString(), nil) + Expect(response.Error).NotTo(HaveOccurred()) + Expect(response.StatusCode).To(Equal(http.StatusConflict)) + + By("allowing the instance to be cleaned up") + response = broker.Client.Deprovision(instanceGUID, serviceOfferingGUID, servicePlanGUID, uuid.NewString()) + Expect(response.Error).NotTo(HaveOccurred()) + Expect(response.StatusCode).To(Equal(http.StatusOK)) + }) - By("starting to update") - response := broker.Client.Update(instance.GUID, serviceOfferingGUID, servicePlanGUID, uuid.NewString(), nil, domain.PreviousValues{}, nil) - Expect(response.Error).NotTo(HaveOccurred()) - Expect(response.StatusCode).To(Equal(http.StatusAccepted)) + It("can recover from a terminated update", func() { + By("successfully provisioning a service instance") + instance, err := broker.Provision(serviceOfferingGUID, servicePlanGUID) + Expect(err).NotTo(HaveOccurred()) - By("terminating and restarting the broker") - Expect(broker.Stop()).To(Succeed()) - broker = must(testdrive.StartBroker(csb, brokerpak, database, testdrive.WithOutputs(stdout, stderr))) + By("starting to update") + response := broker.Client.Update(instance.GUID, serviceOfferingGUID, servicePlanGUID, uuid.NewString(), nil, domain.PreviousValues{}, nil) + Expect(response.Error).NotTo(HaveOccurred()) + Expect(response.StatusCode).To(Equal(http.StatusAccepted)) - By("reporting that an operation failed") - lastOperation, err := broker.LastOperation(instance.GUID) - Expect(err).NotTo(HaveOccurred()) - Expect(lastOperation.Description).To(Equal("the broker restarted while the operation was in progress")) - Expect(lastOperation.State).To(BeEquivalentTo("failed")) + By("terminating and restarting the broker") + Expect(broker.Terminate()).To(Succeed()) + broker = must(testdrive.StartBroker(csb, brokerpak, database, testdrive.WithOutputs(stdout, stderr), testdrive.WithEnv("CF_INSTANCE_GUID=dcfa061e-c0e3-4237-a805-734578347393"))) - By("logging a message") - ws := fmt.Sprintf(`"workspace_id":"tf:%s:"`, instance.GUID) - Expect(string(stdout.Contents())).To(SatisfyAll(ContainSubstring("recover-in-progress-operations.mark-as-failed"), ContainSubstring(ws))) + By("reporting that an operation failed") + lastOperation, err := broker.LastOperation(instance.GUID) + Expect(err).NotTo(HaveOccurred()) + Expect(lastOperation.Description).To(Equal("the broker restarted while the operation was in progress")) + Expect(lastOperation.State).To(BeEquivalentTo("failed")) - By("allowing the operation to be restarted") - Expect(broker.UpdateService(instance)).To(Succeed()) - }) + By("logging a message") + ws := fmt.Sprintf(`"workspace_id":"tf:%s:"`, instance.GUID) + Expect(string(stdout.Contents())).To(SatisfyAll(ContainSubstring("recover-in-progress-operations.mark-as-failed"), ContainSubstring(ws))) - It("can recover from a terminated delete", func() { - By("successfully provisioning a service instance") - instance, err := broker.Provision(serviceOfferingGUID, servicePlanGUID) - Expect(err).NotTo(HaveOccurred()) + By("allowing the operation to be restarted") + Expect(broker.UpdateService(instance)).To(Succeed()) + }) - By("starting to delete") - response := broker.Client.Deprovision(instance.GUID, serviceOfferingGUID, servicePlanGUID, uuid.NewString()) - Expect(response.Error).NotTo(HaveOccurred()) - Expect(response.StatusCode).To(Equal(http.StatusAccepted)) + It("can recover from a terminated delete", func() { + By("successfully provisioning a service instance") + instance, err := broker.Provision(serviceOfferingGUID, servicePlanGUID) + Expect(err).NotTo(HaveOccurred()) - By("terminating and restarting the broker") - Expect(broker.Stop()).To(Succeed()) - broker = must(testdrive.StartBroker(csb, brokerpak, database, testdrive.WithOutputs(stdout, stderr))) + By("starting to delete") + response := broker.Client.Deprovision(instance.GUID, serviceOfferingGUID, servicePlanGUID, uuid.NewString()) + Expect(response.Error).NotTo(HaveOccurred()) + Expect(response.StatusCode).To(Equal(http.StatusAccepted)) - By("reporting that an operation failed") - lastOperation, err := broker.LastOperation(instance.GUID) - Expect(err).NotTo(HaveOccurred()) - Expect(lastOperation.Description).To(Equal("the broker restarted while the operation was in progress")) - Expect(lastOperation.State).To(BeEquivalentTo("failed")) + By("terminating and restarting the broker") + Expect(broker.Terminate()).To(Succeed()) + broker = must(testdrive.StartBroker(csb, brokerpak, database, testdrive.WithOutputs(stdout, stderr), testdrive.WithEnv("CF_INSTANCE_GUID=dcfa061e-c0e3-4237-a805-734578347393"))) - By("logging a message") - ws := fmt.Sprintf(`"workspace_id":"tf:%s:"`, instance.GUID) - Expect(string(stdout.Contents())).To(SatisfyAll(ContainSubstring("recover-in-progress-operations.mark-as-failed"), ContainSubstring(ws))) + By("reporting that an operation failed") + lastOperation, err := broker.LastOperation(instance.GUID) + Expect(err).NotTo(HaveOccurred()) + Expect(lastOperation.Description).To(Equal("the broker restarted while the operation was in progress")) + Expect(lastOperation.State).To(BeEquivalentTo("failed")) - By("allowing the operation to be restarted") - Expect(broker.Deprovision(instance)).To(Succeed()) - }) + By("logging a message") + ws := fmt.Sprintf(`"workspace_id":"tf:%s:"`, instance.GUID) + Expect(string(stdout.Contents())).To(SatisfyAll(ContainSubstring("recover-in-progress-operations.mark-as-failed"), ContainSubstring(ws))) - It("can recover from a terminated bind", func() { - By("successfully provisioning a service instance") - instance, err := broker.Provision(serviceOfferingGUID, servicePlanGUID) - Expect(err).NotTo(HaveOccurred()) + By("allowing the operation to be restarted") + Expect(broker.Deprovision(instance)).To(Succeed()) + }) - By("starting to bind") - bindingGUID := uuid.NewString() - go broker.CreateBinding(instance, testdrive.WithBindingGUID(bindingGUID)) + It("can recover from a terminated bind", func() { + By("successfully provisioning a service instance") + instance, err := broker.Provision(serviceOfferingGUID, servicePlanGUID) + Expect(err).NotTo(HaveOccurred()) - Eventually(stdout).Should(Say(fmt.Sprintf(`"cloud-service-broker.Binding".*"binding_id":"%s"`, bindingGUID))) + By("starting to bind") + bindingGUID := uuid.NewString() + go broker.CreateBinding(instance, testdrive.WithBindingGUID(bindingGUID)) - By("terminating and restarting the broker") - Expect(broker.Stop()).To(Succeed()) - broker = must(testdrive.StartBroker(csb, brokerpak, database, testdrive.WithOutputs(stdout, stderr))) + Eventually(stdout).Should(Say(fmt.Sprintf(`"cloud-service-broker.Binding".*"binding_id":"%s"`, bindingGUID))) - By("allowing the operation to be restarted") - _, err = broker.CreateBinding(instance, testdrive.WithBindingGUID(bindingGUID)) - Expect(err).NotTo(HaveOccurred()) - }) + By("terminating and restarting the broker") + Expect(broker.Terminate()).To(Succeed()) + broker = must(testdrive.StartBroker(csb, brokerpak, database, testdrive.WithOutputs(stdout, stderr), testdrive.WithEnv("CF_INSTANCE_GUID=dcfa061e-c0e3-4237-a805-734578347393"))) + + By("allowing the operation to be restarted") + _, err = broker.CreateBinding(instance, testdrive.WithBindingGUID(bindingGUID)) + Expect(err).NotTo(HaveOccurred()) + }) - It("can recover from a terminated unbind", func() { - By("successfully provisioning a service instance and binding") - instance, err := broker.Provision(serviceOfferingGUID, servicePlanGUID) - Expect(err).NotTo(HaveOccurred()) + It("can recover from a terminated unbind", func() { + By("successfully provisioning a service instance and binding") + instance, err := broker.Provision(serviceOfferingGUID, servicePlanGUID) + Expect(err).NotTo(HaveOccurred()) - bindingGUID := uuid.NewString() - _, err = broker.CreateBinding(instance, testdrive.WithBindingGUID(bindingGUID)) - Expect(err).NotTo(HaveOccurred()) + bindingGUID := uuid.NewString() + _, err = broker.CreateBinding(instance, testdrive.WithBindingGUID(bindingGUID)) + Expect(err).NotTo(HaveOccurred()) - By("starting to unbind") - go broker.DeleteBinding(instance, bindingGUID) + By("starting to unbind") + go broker.DeleteBinding(instance, bindingGUID) - Eventually(stdout).Should(Say(fmt.Sprintf(`"cloud-service-broker.Unbinding".*"binding_id":"%s"`, bindingGUID))) + Eventually(stdout).Should(Say(fmt.Sprintf(`"cloud-service-broker.Unbinding".*"binding_id":"%s"`, bindingGUID))) - By("terminating and restarting the broker") - Expect(broker.Stop()).To(Succeed()) - broker = must(testdrive.StartBroker(csb, brokerpak, database, testdrive.WithOutputs(stdout, stderr))) + By("terminating and restarting the broker") + Expect(broker.Terminate()).To(Succeed()) + broker = must(testdrive.StartBroker(csb, brokerpak, database, testdrive.WithOutputs(stdout, stderr), testdrive.WithEnv("CF_INSTANCE_GUID=dcfa061e-c0e3-4237-a805-734578347393"))) - By("allowing the operation to be restarted") - Expect(broker.DeleteBinding(instance, bindingGUID)).To(Succeed()) + By("allowing the operation to be restarted") + Expect(broker.DeleteBinding(instance, bindingGUID)).To(Succeed()) + }) }) }) diff --git a/internal/storage/storage_suite_test.go b/internal/storage/storage_suite_test.go index c02209966..31728a8c0 100644 --- a/internal/storage/storage_suite_test.go +++ b/internal/storage/storage_suite_test.go @@ -5,6 +5,7 @@ import ( "strings" "testing" + "code.cloudfoundry.org/lager/v3/lagertest" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "gorm.io/driver/sqlite" @@ -19,6 +20,7 @@ var ( db *gorm.DB encryptor *storagefakes.FakeEncryptor store *storage.Storage + logger *lagertest.TestLogger ) func TestStorage(t *testing.T) { diff --git a/pkg/providers/tf/deployment_manager_test.go b/pkg/providers/tf/deployment_manager_test.go index 0e04c3b20..30c8d5eb5 100644 --- a/pkg/providers/tf/deployment_manager_test.go +++ b/pkg/providers/tf/deployment_manager_test.go @@ -205,7 +205,8 @@ var _ = Describe("DeploymentManager", func() { storedDeployment := fakeStore.StoreTerraformDeploymentArgsForCall(0) Expect(storedDeployment.LastOperationState).To(Equal("succeeded")) Expect(storedDeployment.LastOperationMessage).To(Equal("provision succeeded: apply completed successfully")) - Expect(fakeLogger.Logs()).To(BeEmpty()) + Expect(fakeLogger.Logs()).To(HaveLen(1)) + Expect(fakeLogger.Logs()[0].Message).To(Equal("broker.successfully stored state for deploymentID")) }) }) @@ -222,12 +223,12 @@ var _ = Describe("DeploymentManager", func() { Expect(storedDeployment.LastOperationType).To(Equal(existingDeployment.LastOperationType)) Expect(storedDeployment.LastOperationState).To(Equal("failed")) Expect(storedDeployment.LastOperationMessage).To(Equal("provision failed: operation failed dramatically")) - Expect(fakeLogger.Logs()).To(HaveLen(1)) + Expect(fakeLogger.Logs()).To(HaveLen(2)) Expect(fakeLogger.Logs()[0].Message).To(ContainSubstring("operation-failed")) Expect(fakeLogger.Logs()[0].Data).To(HaveKeyWithValue("error", Equal("operation failed dramatically"))) Expect(fakeLogger.Logs()[0].Data).To(HaveKeyWithValue("message", Equal("provision failed: operation failed dramatically"))) Expect(fakeLogger.Logs()[0].Data).To(HaveKeyWithValue("deploymentID", Equal(existingDeployment.ID))) - + Expect(fakeLogger.Logs()[1].Message).To(Equal("broker.successfully stored state for deploymentID")) }) }) }) From f50945ea1207f5cbb0c194ee8dbdb5cd12e9cc61 Mon Sep 17 00:00:00 2001 From: Felisia Martini Date: Thu, 12 Sep 2024 16:45:13 +0100 Subject: [PATCH 5/5] Changes to address code review --- integrationtest/termination_recovery_test.go | 6 +- .../storage/recover_in_progress_operations.go | 79 +++++++++++-------- internal/storage/terraform_deployment.go | 15 ++-- 3 files changed, 59 insertions(+), 41 deletions(-) diff --git a/integrationtest/termination_recovery_test.go b/integrationtest/termination_recovery_test.go index fa356fc17..b8b1d64c3 100644 --- a/integrationtest/termination_recovery_test.go +++ b/integrationtest/termination_recovery_test.go @@ -30,11 +30,11 @@ var _ = Describe("Recovery From Broker Termination", func() { BeforeEach(func() { brokerpak = must(packer.BuildBrokerpak(csb, fixtures("termination-recovery"))) - }) - BeforeEach(func() { + stdout = NewBuffer() stderr = NewBuffer() }) + Describe("running csb on a VM", func() { Describe("when a vm broker properly drains", func() { BeforeEach(func() { @@ -56,6 +56,7 @@ var _ = Describe("Recovery From Broker Termination", func() { By("gracefully stopping the broker") // Stop seems to be blocking, so run it in a routine so we can check that the broker actually rejects requests until it's fully stopped. go func() { + defer GinkgoRecover() Expect(broker.Stop()).To(Succeed()) }() @@ -107,6 +108,7 @@ var _ = Describe("Recovery From Broker Termination", func() { By("forcefully stopping the broker") // Stop seems to be blocking, so run it in a routine so we can check that the broker actually rejects requests until it's fully stopped. go func() { + defer GinkgoRecover() Expect(broker.Terminate()).To(Succeed()) }() diff --git a/internal/storage/recover_in_progress_operations.go b/internal/storage/recover_in_progress_operations.go index ae8f48c9c..e532fe48c 100644 --- a/internal/storage/recover_in_progress_operations.go +++ b/internal/storage/recover_in_progress_operations.go @@ -13,44 +13,55 @@ const FailedMessage = "the broker restarted while the operation was in progress" func (s *Storage) RecoverInProgressOperations(logger lager.Logger) error { logger = logger.Session("recover-in-progress-operations") - // We only wan't to fail interrupted service instances if we detect that we run as a CF APP. - // VM based csb instances implement a drain mechanism and should need this. Additionally VM - // based csb deployments are scalable horizontally and the below would fail in flight instances - // of another csb process. - if os.Getenv("CF_INSTANCE_GUID") != "" { - var terraformDeploymentBatch []models.TerraformDeployment - result := s.db.Where("last_operation_state = ?", "in progress").FindInBatches(&terraformDeploymentBatch, 100, func(tx *gorm.DB, batchNumber int) error { - for i := range terraformDeploymentBatch { - terraformDeploymentBatch[i].LastOperationState = "failed" - terraformDeploymentBatch[i].LastOperationMessage = FailedMessage - logger.Info("mark-as-failed", lager.Data{"workspace_id": terraformDeploymentBatch[i].ID}) - } - - return tx.Save(&terraformDeploymentBatch).Error - }) - - return result.Error + if runningAsCFApp() { + return s.markAllInProgressOperationsAsFailed(logger) } else { - deploymentIds, err := s.GetLockedDeploymentIds() - if err != nil { - return err - } + return s.markAllOperationsWithLockFilesAsFailed(logger) + } + +} + +func runningAsCFApp() bool { + return os.Getenv("CF_INSTANCE_GUID") != "" +} - for _, id := range deploymentIds { - var receiver models.TerraformDeployment - if err := s.db.Where("id = ?", id).First(&receiver).Error; err != nil { - return err - } - receiver.LastOperationState = "failed" - receiver.LastOperationMessage = FailedMessage - - err := s.db.Save(receiver).Error - if err != nil { - return err - } - logger.Info("mark-as-failed", lager.Data{"workspace_id": id}) +func (s *Storage) markAllInProgressOperationsAsFailed(logger lager.Logger) error { + logger.Info("checking all in in progress operations from DB") + var terraformDeploymentBatch []models.TerraformDeployment + result := s.db.Where("last_operation_state = ?", "in progress").FindInBatches(&terraformDeploymentBatch, 100, func(tx *gorm.DB, batchNumber int) error { + for i := range terraformDeploymentBatch { + terraformDeploymentBatch[i].LastOperationState = "failed" + terraformDeploymentBatch[i].LastOperationMessage = FailedMessage + logger.Info("mark-as-failed", lager.Data{"workspace_id": terraformDeploymentBatch[i].ID}) } + + return tx.Save(&terraformDeploymentBatch).Error + }) + + return result.Error +} + +func (s *Storage) markAllOperationsWithLockFilesAsFailed(logger lager.Logger) error { + logger.Info("checking all in in progress operations from lockfiles") + + deploymentIds, err := s.GetLockedDeploymentIds() + if err != nil { return err } + for _, id := range deploymentIds { + var receiver models.TerraformDeployment + if err := s.db.Where("id = ?", id).First(&receiver).Error; err != nil { + return err + } + receiver.LastOperationState = "failed" + receiver.LastOperationMessage = FailedMessage + + err := s.db.Save(receiver).Error + if err != nil { + return err + } + logger.Info("mark-as-failed", lager.Data{"workspace_id": id}) + } + return err } diff --git a/internal/storage/terraform_deployment.go b/internal/storage/terraform_deployment.go index 223793293..c6e8735c6 100644 --- a/internal/storage/terraform_deployment.go +++ b/internal/storage/terraform_deployment.go @@ -3,6 +3,7 @@ package storage import ( "fmt" "os" + "path/filepath" "strings" "time" @@ -165,22 +166,26 @@ func (s *Storage) LockFilesExist() bool { } func (s *Storage) WriteLockFile(deploymentID string) error { - return os.WriteFile(fmt.Sprintf("%s/%s", s.lockFileDir, sanitizeFileName(deploymentID)), []byte{}, 0o644) + return os.WriteFile(filepath.Join(s.lockFileDir, fileNameFromDeploymentID(deploymentID)), []byte{}, 0o644) } func (s *Storage) RemoveLockFile(deploymentID string) error { - return os.Remove(fmt.Sprintf("%s/%s", s.lockFileDir, sanitizeFileName(deploymentID))) + return os.Remove(filepath.Join(s.lockFileDir, fileNameFromDeploymentID(deploymentID))) } func (s *Storage) GetLockedDeploymentIds() ([]string, error) { entries, err := os.ReadDir(s.lockFileDir) var names []string for _, entry := range entries { - names = append(names, strings.ReplaceAll(entry.Name(), "_", ":")) + names = append(names, deploymentIDFromFileName(entry.Name())) } return names, err } -func sanitizeFileName(name string) string { - return strings.ReplaceAll(name, ":", "_") +func fileNameFromDeploymentID(deploymentID string) string { + return strings.ReplaceAll(deploymentID, ":", "_") +} + +func deploymentIDFromFileName(fileName string) string { + return strings.ReplaceAll(fileName, "_", ":") }