From 290446f725197348aa8ea00f3ab4471223636854 Mon Sep 17 00:00:00 2001 From: ifindlay-cci Date: Tue, 3 Sep 2024 13:35:21 +0100 Subject: [PATCH 1/7] feat: added tls support to cloud service broker app When running as an app in CF we can rely on the platform to handle TLS setup, but on a VM currently there is no way to have encrypted traffic. TPCF-26820 --- cmd/serve.go | 25 ++++++- docs/configuration.md | 2 + integrationtest/server_test.go | 66 +++++++++++++++++ internal/testdrive/broker_start.go | 110 ++++++++++++++++++++++++++++- 4 files changed, 201 insertions(+), 2 deletions(-) create mode 100644 integrationtest/server_test.go diff --git a/cmd/serve.go b/cmd/serve.go index feb61bf7b..40262c9b3 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -51,6 +51,8 @@ const ( apiPasswordProp = "api.password" apiPortProp = "api.port" apiHostProp = "api.host" + tlsCertCaBundleProp = "api.certCaBundle" + tlsKeyProp = "api.tlsKey" encryptionPasswords = "db.encryption.passwords" encryptionEnabled = "db.encryption.enabled" ) @@ -84,6 +86,8 @@ func init() { _ = viper.BindEnv(apiHostProp, "CSB_LISTENER_HOST") _ = viper.BindEnv(encryptionPasswords, "ENCRYPTION_PASSWORDS") _ = viper.BindEnv(encryptionEnabled, "ENCRYPTION_ENABLED") + _ = viper.BindEnv(tlsCertCaBundleProp, "TLS_CERT_CHAIN") + _ = viper.BindEnv(tlsKeyProp, "TLS_PRIVATE_KEY") } func serve() { @@ -210,7 +214,26 @@ func startServer(registry pakBroker.BrokerRegistry, db *sql.DB, brokerapi http.H port := viper.GetString(apiPortProp) host := viper.GetString(apiHostProp) logger.Info("Serving", lager.Data{"port": port}) - _ = http.ListenAndServe(fmt.Sprintf("%s:%s", host, port), router) + + tlsCertCaBundle := viper.GetString(tlsCertCaBundleProp) + tlsKey := viper.GetString(tlsKeyProp) + + logger.Info("tlsCertCaBundle", lager.Data{"tlsCertCaBundle": tlsCertCaBundle}) + logger.Info("tlsKey", lager.Data{"tlsKey": tlsKey}) + + httpServer := &http.Server{ + Addr: fmt.Sprintf("%s:%s", host, port), + Handler: router, + } + + if tlsCertCaBundle != "" && tlsKey != "" { + err := httpServer.ListenAndServeTLS(tlsCertCaBundle, tlsKey) + if err != nil { + logger.Fatal("Failed to start broker", err) + } + } else { + _ = httpServer.ListenAndServe() + } } func labelName(label string) string { diff --git a/docs/configuration.md b/docs/configuration.md index e5f164873..65d0db48b 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -83,6 +83,8 @@ Broker service configuration values: | SECURITY_USER_NAME * | api.user | string |

Broker authentication username

| | SECURITY_USER_PASSWORD * | api.password | string |

Broker authentication password

| | PORT | api.port | string |

Port to bind broker to

| +| TLS_CERT_CHAIN | api.certCaBundle | string |

File path to a pem encoded certificate chain

| +| TLS_PRIVATE_KEY | api.tlsKey | string |

File path to a pem encoded private key

| ## Feature flags Configuration diff --git a/integrationtest/server_test.go b/integrationtest/server_test.go new file mode 100644 index 000000000..7d5a889ed --- /dev/null +++ b/integrationtest/server_test.go @@ -0,0 +1,66 @@ +package integrationtest_test + +import ( + "fmt" + "net/http" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/cloudfoundry/cloud-service-broker/v2/integrationtest/packer" + "github.com/cloudfoundry/cloud-service-broker/v2/internal/testdrive" +) + +var _ = Describe("Starting Server", func() { + + const userProvidedPlan = `[{"name": "user-plan-unique","id":"8b52a460-b246-11eb-a8f5-d349948e2481"}]` + + var brokerpak string + + BeforeEach(func() { + brokerpak = must(packer.BuildBrokerpak(csb, fixtures("service-catalog"))) + + DeferCleanup(func() { + cleanup(brokerpak) + }) + }) + + When("TLS data is provided", func() { + When("Valid data exists", func() { + It("Should accept HTTPS requests", func() { + isValid := true + broker, err := testdrive.StartBroker(csb, brokerpak, database, testdrive.WithTLSConfig(isValid), testdrive.WithEnv(fmt.Sprintf("GSB_SERVICE_ALPHA_SERVICE_PLANS=%s", userProvidedPlan)), testdrive.WithOutputs(GinkgoWriter, GinkgoWriter)) + Expect(err).NotTo(HaveOccurred()) + + _, err = http.Get(fmt.Sprintf("https://localhost:%d", broker.Port)) + Expect(err).NotTo(HaveOccurred()) + }) + }) + + When("Invalid data exists", func() { + It("Should fail to start", func() { + notValid := false + _, err := testdrive.StartBroker(csb, brokerpak, database, testdrive.WithTLSConfig(notValid), testdrive.WithEnv(fmt.Sprintf("GSB_SERVICE_ALPHA_SERVICE_PLANS=%s", userProvidedPlan)), testdrive.WithOutputs(GinkgoWriter, GinkgoWriter)) + Expect(err).To(HaveOccurred()) + }) + }) + }) + + When("No TLS data is provided", func() { + It("Should return an error for HTTPS requests", func() { + broker, err := testdrive.StartBroker(csb, brokerpak, database, testdrive.WithEnv(fmt.Sprintf("GSB_SERVICE_ALPHA_SERVICE_PLANS=%s", userProvidedPlan)), testdrive.WithOutputs(GinkgoWriter, GinkgoWriter)) + Expect(err).NotTo(HaveOccurred()) + + _, err = http.Get(fmt.Sprintf("https://localhost:%d", broker.Port)) + Expect(err).To(HaveOccurred()) + }) + + It("Should succeed for HTTP requests", func() { + broker, err := testdrive.StartBroker(csb, brokerpak, database, testdrive.WithEnv(fmt.Sprintf("GSB_SERVICE_ALPHA_SERVICE_PLANS=%s", userProvidedPlan)), testdrive.WithOutputs(GinkgoWriter, GinkgoWriter)) + Expect(err).NotTo(HaveOccurred()) + + _, err = http.Get(fmt.Sprintf("http://localhost:%d", broker.Port)) + Expect(err).NotTo(HaveOccurred()) + }) + }) +}) diff --git a/internal/testdrive/broker_start.go b/internal/testdrive/broker_start.go index edc506cd1..d6b356875 100644 --- a/internal/testdrive/broker_start.go +++ b/internal/testdrive/broker_start.go @@ -2,14 +2,24 @@ package testdrive import ( "bytes" + "crypto/rand" + "crypto/rsa" + "crypto/tls" + "crypto/x509" + "crypto/x509/pkix" + "encoding/pem" "fmt" "io" + "math/big" + "net" "net/http" "os" "os/exec" "strings" "time" + . "github.com/onsi/gomega" + "github.com/cloudfoundry/cloud-service-broker/v2/pkg/client" "github.com/cloudfoundry/cloud-service-broker/v2/utils/freeport" "github.com/google/uuid" @@ -83,7 +93,16 @@ func StartBroker(csbPath, bpk, db string, opts ...StartBrokerOption) (*Broker, e start := time.Now() for { - response, err := http.Head(fmt.Sprintf("http://localhost:%d", port)) + scheme := "http" + for _, envVar := range cmd.Env { + if strings.HasPrefix(envVar, "TLS_") { + http.DefaultTransport.(*http.Transport).TLSClientConfig = &tls.Config{InsecureSkipVerify: true} + scheme = "https" + break + } + } + + response, err := http.Head(fmt.Sprintf("%s://localhost:%d", scheme, port)) switch { case err == nil && response.StatusCode == http.StatusOK: return &broker, nil @@ -99,6 +118,95 @@ func StartBroker(csbPath, bpk, db string, opts ...StartBrokerOption) (*Broker, e } } +func createCAKeyPair(msg string) (*x509.Certificate, []byte, *rsa.PrivateKey) { + ca := &x509.Certificate{ + SerialNumber: big.NewInt(2019), + Subject: pkix.Name{ + Country: []string{msg}, + }, + IsCA: true, + NotBefore: time.Now(), + NotAfter: time.Now().AddDate(10, 0, 0), + KeyUsage: x509.KeyUsageDigitalSignature | x509.KeyUsageCertSign, + BasicConstraintsValid: true, + } + + caPrivKey, err := rsa.GenerateKey(rand.Reader, 4096) + Expect(err).NotTo(HaveOccurred()) + + caBytes, err := x509.CreateCertificate(rand.Reader, ca, ca, &caPrivKey.PublicKey, caPrivKey) + Expect(err).NotTo(HaveOccurred()) + + caPEM, _ := encodeKeyPair(caBytes, x509.MarshalPKCS1PrivateKey(caPrivKey)) + + return ca, caPEM, caPrivKey +} + +func createKeyPairSignedByCA(ca *x509.Certificate, caPrivKey *rsa.PrivateKey) ([]byte, []byte) { + cert := &x509.Certificate{ + SerialNumber: big.NewInt(1658), + Subject: pkix.Name{ + Country: []string{"GB"}, + }, + DNSNames: []string{"localhost"}, + IPAddresses: []net.IP{net.IPv4(127, 0, 0, 1)}, + NotBefore: time.Now(), + NotAfter: time.Now().AddDate(10, 0, 0), + ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth, x509.ExtKeyUsageServerAuth}, + KeyUsage: x509.KeyUsageDigitalSignature, + } + + certPrivKey, err := rsa.GenerateKey(rand.Reader, 4096) + Expect(err).NotTo(HaveOccurred()) + + certBytes, err := x509.CreateCertificate(rand.Reader, cert, ca, &certPrivKey.PublicKey, caPrivKey) + Expect(err).NotTo(HaveOccurred()) + + certPEM, certPrivKeyPEM := encodeKeyPair(certBytes, x509.MarshalPKCS1PrivateKey(certPrivKey)) + return certPEM, certPrivKeyPEM +} + +func encodeKeyPair(caBytes, caPrivKeyBytes []byte) ([]byte, []byte) { + caPEM := pem.EncodeToMemory(&pem.Block{ + Type: "CERTIFICATE", + Bytes: caBytes, + }) + + caPrivKeyPEM := pem.EncodeToMemory(&pem.Block{ + Type: "RSA PRIVATE KEY", + Bytes: caPrivKeyBytes, + }) + + return caPEM, caPrivKeyPEM +} + +func WithTLSConfig(isValid bool) StartBrokerOption { + return func(cfg *startBrokerConfig) { + ca, _, caPrivKey := createCAKeyPair("US") + + serverCert, serverPrivKey := createKeyPairSignedByCA(ca, caPrivKey) + + certFileBuf, err := os.CreateTemp("", "") + Expect(err).NotTo(HaveOccurred()) + defer certFileBuf.Close() + + privKeyFileBuf, err := os.CreateTemp("", "") + Expect(err).NotTo(HaveOccurred()) + defer privKeyFileBuf.Close() + + if !isValid { + serverPrivKey[10] = 'a' + } + + Expect(os.WriteFile(privKeyFileBuf.Name(), serverPrivKey, 0644)).To(Succeed()) + + Expect(os.WriteFile(certFileBuf.Name(), serverCert, 0644)).To(Succeed()) + + cfg.env = append(cfg.env, fmt.Sprintf("TLS_CERT_CHAIN=%s", certFileBuf.Name())) + cfg.env = append(cfg.env, fmt.Sprintf("TLS_PRIVATE_KEY=%s", privKeyFileBuf.Name())) + } +} + func WithEnv(extraEnv ...string) StartBrokerOption { return func(cfg *startBrokerConfig) { cfg.env = append(cfg.env, extraEnv...) From a4f7e836449097138b3ac170077eefc7847cc4cf Mon Sep 17 00:00:00 2001 From: nouseforaname <34882943+nouseforaname@users.noreply.github.com> Date: Tue, 3 Sep 2024 15:21:47 +0200 Subject: [PATCH 2/7] ignore linting for test helper it is angry about `.` imports. But we do not mind because this is not production code. --- internal/testdrive/broker_start.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/testdrive/broker_start.go b/internal/testdrive/broker_start.go index d6b356875..97172d46b 100644 --- a/internal/testdrive/broker_start.go +++ b/internal/testdrive/broker_start.go @@ -18,6 +18,7 @@ import ( "strings" "time" + //lint:ignore ST1001 we do not care because this is a test helper . "github.com/onsi/gomega" "github.com/cloudfoundry/cloud-service-broker/v2/pkg/client" From 4a35489e8160461db07427a077b5d14a7fa9e61b Mon Sep 17 00:00:00 2001 From: Iain Findlay Date: Wed, 4 Sep 2024 15:59:27 +0200 Subject: [PATCH 3/7] feat: enable gracefull shutdowns of the broker when a csb app that is running in cf is stopped outside of it's own lifecycle ( e.g. the diego cell is redeployed) we do not have a great way of ensuring that all in flight terraform executions will be able to finish their work and write back the resulting tf state to the csb DB. Diego assumes that an app will gracefully shutdown within 10s of receiving SIGTERM, if that is not the case, the App will receive a SIGKILL and stop abruptly That creates orphaned resources in the underlying IaaS that cannot be cleaned up by the csb because the CSB does not have the tfstate for the terraform resources that were in flight when the CSB got shutdown. To aleviate this issue, this introduces a graceful shutdown sequence and and lockfiles on disk ( to be consumed by a drain script ). This enables to deploy the CSB as a workload on a bosh instance. Instead of marking specific SI instances as failed, this ensures that the broker will - stop accepting new requests - finish all in flight TF before shutdown. The drain script can be kept simple by inspecting a folder. If that folder is empty, it is safe to proceed to stop the CSB. We also tried a drain script based on inspecting the processes running ( e.g. if a tofu or provider binary is still being executed ). Though that seems potentially unreliable ( since there could be time of check // time of use issues ) that falsely suggest that everything is finished ( e.g. because we checked right between two invocations of the provider / tofu binaries ) - fly-by: some structs got their fiels reordered to improve their memory footprint. --- cmd/serve.go | 58 +++- dbservice/dbservice.go | 11 +- .../fake-uuid-provision.tf | 11 +- .../termination-recovery/manifest.yml | 2 + integrationtest/termination_recovery_test.go | 276 ++++++++++-------- internal/storage/storage.go | 24 +- internal/storage/terraform_deployment.go | 19 ++ internal/testdrive/broker.go | 11 +- internal/testdrive/broker_start.go | 6 +- internal/testdrive/runner.go | 22 +- pkg/broker/service_provider.go | 2 + pkg/providers/tf/deployment_manager.go | 11 +- 12 files changed, 315 insertions(+), 138 deletions(-) diff --git a/cmd/serve.go b/cmd/serve.go index 40262c9b3..0557a8918 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -22,7 +22,11 @@ import ( "io" "log/slog" "net/http" + "os" + "os/signal" "strings" + "syscall" + "time" "code.cloudfoundry.org/lager/v3" osbapiBroker "github.com/cloudfoundry/cloud-service-broker/v2/brokerapi/broker" @@ -55,6 +59,8 @@ const ( tlsKeyProp = "api.tlsKey" encryptionPasswords = "db.encryption.passwords" encryptionEnabled = "db.encryption.enabled" + + shutdownTimeout = time.Hour ) var cfCompatibilityToggle = toggles.Features.Toggle("enable-cf-sharing", false, `Set all services to have the Sharable flag so they can be shared @@ -102,7 +108,8 @@ func serve() { logger.Fatal("Error initializing service broker config", err) } var serviceBroker domain.ServiceBroker - serviceBroker, err = osbapiBroker.New(cfg, storage.New(db, encryptor), logger) + csbStore := storage.New(db, encryptor) + serviceBroker, err = osbapiBroker.New(cfg, csbStore, logger) if err != nil { logger.Fatal("Error initializing service broker", err) } @@ -226,13 +233,50 @@ func startServer(registry pakBroker.BrokerRegistry, db *sql.DB, brokerapi http.H Handler: router, } - if tlsCertCaBundle != "" && tlsKey != "" { - err := httpServer.ListenAndServeTLS(tlsCertCaBundle, tlsKey) - if err != nil { - logger.Fatal("Failed to start broker", err) + go func() { + var err error + if tlsCertCaBundle != "" && tlsKey != "" { + err = httpServer.ListenAndServeTLS(tlsCertCaBundle, tlsKey) + } else { + err = httpServer.ListenAndServe() } - } else { - _ = httpServer.ListenAndServe() + if err == http.ErrServerClosed { + logger.Info("shutting down csb") + } + }() + + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGKILL, syscall.SIGTERM) + + signalReceived := <-sigChan + + switch signalReceived { + + case syscall.SIGTERM: + logger.Info("received SIGTERM, server is shutting down gracefully allowing for in flight work to finish") + + shutdownCtx, shutdownRelease := context.WithTimeout(context.Background(), shutdownTimeout) + defer shutdownRelease() + for { + if store.LockFilesExist() { + logger.Info("draining csb instance") + time.Sleep(time.Second * 1) + break + } + } + logger.Info("draining complete") + if err := httpServer.Shutdown(shutdownCtx); err != nil { + logger.Fatal("shutdown error: %v", err) + } + + logger.Info("shutdown complete") + case syscall.SIGKILL: + logger.Info("received SIGKILL, server is shutting down immediately. In flight operations will not finish and their state is potentially lost.") + if err := httpServer.Close(); err != nil { + logger.Error("shutdown", err) + } + default: + logger.Info("csb does not handle the interrupt signal") } } diff --git a/dbservice/dbservice.go b/dbservice/dbservice.go index 10b1a8faa..9fdea783d 100644 --- a/dbservice/dbservice.go +++ b/dbservice/dbservice.go @@ -17,6 +17,7 @@ package dbservice import ( "fmt" + "os" "sync" "code.cloudfoundry.org/lager/v3" @@ -43,8 +44,14 @@ func NewWithMigrations(logger lager.Logger) *gorm.DB { if err := RunMigrations(db); err != nil { panic(fmt.Sprintf("Error migrating database: %s", err)) } - if err := recoverInProgressOperations(db, logger); err != nil { - panic(fmt.Sprintf("Error recovering in-progress operations: %s", err)) + // We only wan't to fail interrupted service instances if we detect that we run as a CF APP. + // VM based csb instances implement a drain mechanism and should need this. Additionally VM + // based csb deployments are scalable horizontally and the below would fail in flight instances + // of another csb process. + if os.Getenv("CF_INSTANCE_GUID") != "" { + if err := recoverInProgressOperations(db, logger); err != nil { + panic(fmt.Sprintf("Error recovering in-progress operations: %s", err)) + } } }) return db diff --git a/integrationtest/fixtures/termination-recovery/fake-uuid-provision.tf b/integrationtest/fixtures/termination-recovery/fake-uuid-provision.tf index db12d4a33..1538fb5f9 100644 --- a/integrationtest/fixtures/termination-recovery/fake-uuid-provision.tf +++ b/integrationtest/fixtures/termination-recovery/fake-uuid-provision.tf @@ -3,9 +3,16 @@ terraform { random = { source = "registry.terraform.io/hashicorp/random" } + null = { + source = "registry.terraform.io/hashicorp/null" + } + } +} +resource "null_resource" "sleeper" { + provisioner "local-exec" { + command = "sleep 5" } } - resource "random_uuid" "random" {} -output provision_output { value = random_uuid.random.result } \ No newline at end of file +output provision_output { value = random_uuid.random.result } diff --git a/integrationtest/fixtures/termination-recovery/manifest.yml b/integrationtest/fixtures/termination-recovery/manifest.yml index 306bb39ab..b5758ffe9 100644 --- a/integrationtest/fixtures/termination-recovery/manifest.yml +++ b/integrationtest/fixtures/termination-recovery/manifest.yml @@ -14,5 +14,7 @@ terraform_binaries: source: https://github.com/opentofu/opentofu/archive/refs/tags/v1.6.0.zip - name: terraform-provider-random version: 3.1.0 +- name: terraform-provider-null + version: 3.2.2 service_definitions: - fake-uuid-service.yml diff --git a/integrationtest/termination_recovery_test.go b/integrationtest/termination_recovery_test.go index 79c547c70..c808535e0 100644 --- a/integrationtest/termination_recovery_test.go +++ b/integrationtest/termination_recovery_test.go @@ -3,12 +3,14 @@ package integrationtest_test import ( "fmt" "net/http" + "time" "github.com/cloudfoundry/cloud-service-broker/v2/integrationtest/packer" "github.com/cloudfoundry/cloud-service-broker/v2/internal/testdrive" "github.com/google/uuid" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + "github.com/onsi/gomega/gbytes" . "github.com/onsi/gomega/gbytes" "github.com/pivotal-cf/brokerapi/v11/domain" ) @@ -25,148 +27,194 @@ var _ = Describe("Recovery From Broker Termination", func() { stdout *Buffer stderr *Buffer ) + Describe("running csb on a VM", func() { + BeforeEach(func() { + brokerpak = must(packer.BuildBrokerpak(csb, fixtures("termination-recovery"))) + + stdout = gbytes.NewBuffer() + stderr = gbytes.NewBuffer() + broker = must(testdrive.StartBroker(csb, brokerpak, database, testdrive.WithOutputs(stdout, stderr))) + + DeferCleanup(func() { + Expect(broker.Terminate()).To(Succeed()) + cleanup(brokerpak) + }) + }) + FIt("can can finish the in flight operation", func() { + By("starting to provision") + instanceGUID := uuid.NewString() + response := broker.Client.Provision(instanceGUID, serviceOfferingGUID, servicePlanGUID, uuid.NewString(), nil) - BeforeEach(func() { - brokerpak = must(packer.BuildBrokerpak(csb, fixtures("termination-recovery"))) - - stdout = NewBuffer() - stderr = NewBuffer() - broker = must(testdrive.StartBroker(csb, brokerpak, database, testdrive.WithOutputs(stdout, stderr))) + Expect(response.Error).NotTo(HaveOccurred()) + Expect(response.StatusCode).To(Equal(http.StatusAccepted)) + Eventually(stdout, time.Second*5).Should(gbytes.Say(`tofu","apply","-auto-approve"`)) + By("gracefully stopping the broker") - DeferCleanup(func() { Expect(broker.Stop()).To(Succeed()) - cleanup(brokerpak) + + By("ensuring that the broker rejects requests") + Expect(broker.Client.LastOperation(instanceGUID, uuid.NewString()).Error).To(HaveOccurred()) + + By("logging a message") + Expect(string(stdout.Contents())).To(ContainSubstring("received SIGTERM")) + Expect(string(stdout.Contents())).To(ContainSubstring("draining csb")) + Expect(string(stdout.Contents())).To(ContainSubstring("draining complete")) + Expect(string(stdout.Contents())).ToNot(ContainSubstring("shutdown error")) + + broker = must(testdrive.StartBroker(csb, brokerpak, database, testdrive.WithOutputs(stdout, stderr))) + + By("checking that the resource finished succesfully") + response = broker.Client.LastOperation(instanceGUID, uuid.NewString()) + Expect(string(response.ResponseBody)).To(ContainSubstring(`{"state":"succeeded","description":"provision succeeded"}`)) + Expect(response.Error).NotTo(HaveOccurred()) + Expect(response.StatusCode).To(Equal(http.StatusOK)) + + By("ensuring SI can be succesfully deleted") }) }) + Describe("running csb as a CF app", func() { + BeforeEach(func() { + brokerpak = must(packer.BuildBrokerpak(csb, fixtures("termination-recovery"))) + + stdout = NewBuffer() + stderr = NewBuffer() + broker = must(testdrive.StartBroker(csb, brokerpak, database, testdrive.WithOutputs(stdout, stderr), testdrive.WithEnv("CF_INSTANCE_GUID=dcfa061e-c0e3-4237-a805-734578347393"))) + + DeferCleanup(func() { + Expect(broker.Terminate()).To(Succeed()) + cleanup(brokerpak) + }) + }) - It("can recover from a terminated create", func() { - By("starting to provision") - instanceGUID := uuid.NewString() - response := broker.Client.Provision(instanceGUID, serviceOfferingGUID, servicePlanGUID, uuid.NewString(), nil) - Expect(response.Error).NotTo(HaveOccurred()) - Expect(response.StatusCode).To(Equal(http.StatusAccepted)) - - By("terminating and restarting the broker") - Expect(broker.Stop()).To(Succeed()) - broker = must(testdrive.StartBroker(csb, brokerpak, database, testdrive.WithOutputs(stdout, stderr))) - - By("reporting that an operation failed") - lastOperation, err := broker.LastOperation(instanceGUID) - Expect(err).NotTo(HaveOccurred()) - Expect(lastOperation.Description).To(Equal("the broker restarted while the operation was in progress")) - Expect(lastOperation.State).To(BeEquivalentTo("failed")) - - By("logging a message") - ws := fmt.Sprintf(`"workspace_id":"tf:%s:"`, instanceGUID) - Expect(string(stdout.Contents())).To(SatisfyAll(ContainSubstring("recover-in-progress-operations.mark-as-failed"), ContainSubstring(ws))) - - // OSBAPI requires that HTTP 409 (Conflict) is returned - By("refusing to allow a duplicate instance") - response = broker.Client.Provision(instanceGUID, serviceOfferingGUID, servicePlanGUID, uuid.NewString(), nil) - Expect(response.Error).NotTo(HaveOccurred()) - Expect(response.StatusCode).To(Equal(http.StatusConflict)) - - By("allowing the instance to be cleaned up") - response = broker.Client.Deprovision(instanceGUID, serviceOfferingGUID, servicePlanGUID, uuid.NewString()) - Expect(response.Error).NotTo(HaveOccurred()) - Expect(response.StatusCode).To(Equal(http.StatusOK)) - }) + It("can recover from a terminated create", func() { + By("starting to provision") + instanceGUID := uuid.NewString() + response := broker.Client.Provision(instanceGUID, serviceOfferingGUID, servicePlanGUID, uuid.NewString(), nil) + Expect(response.Error).NotTo(HaveOccurred()) + Expect(response.StatusCode).To(Equal(http.StatusAccepted)) + + By("terminating and restarting the broker") + Expect(broker.Terminate()).To(Succeed()) + broker = must(testdrive.StartBroker(csb, brokerpak, database, testdrive.WithOutputs(stdout, stderr), testdrive.WithEnv("CF_INSTANCE_GUID=dcfa061e-c0e3-4237-a805-734578347393"))) + + By("reporting that an operation failed") + lastOperation, err := broker.LastOperation(instanceGUID) + Expect(err).NotTo(HaveOccurred()) + Expect(lastOperation.Description).To(Equal("the broker restarted while the operation was in progress")) + Expect(lastOperation.State).To(BeEquivalentTo("failed")) + + By("logging a message") + ws := fmt.Sprintf(`"workspace_id":"tf:%s:"`, instanceGUID) + Expect(string(stdout.Contents())).To(SatisfyAll(ContainSubstring("received SIGKILL"), ContainSubstring("recover-in-progress-operations.mark-as-failed"), ContainSubstring(ws))) + + // OSBAPI requires that HTTP 409 (Conflict) is returned + By("refusing to allow a duplicate instance") + response = broker.Client.Provision(instanceGUID, serviceOfferingGUID, servicePlanGUID, uuid.NewString(), nil) + Expect(response.Error).NotTo(HaveOccurred()) + Expect(response.StatusCode).To(Equal(http.StatusConflict)) + + By("allowing the instance to be cleaned up") + response = broker.Client.Deprovision(instanceGUID, serviceOfferingGUID, servicePlanGUID, uuid.NewString()) + Expect(response.Error).NotTo(HaveOccurred()) + Expect(response.StatusCode).To(Equal(http.StatusOK)) + }) - It("can recover from a terminated update", func() { - By("successfully provisioning a service instance") - instance, err := broker.Provision(serviceOfferingGUID, servicePlanGUID) - Expect(err).NotTo(HaveOccurred()) + It("can recover from a terminated update", func() { + By("successfully provisioning a service instance") + instance, err := broker.Provision(serviceOfferingGUID, servicePlanGUID) + Expect(err).NotTo(HaveOccurred()) - By("starting to update") - response := broker.Client.Update(instance.GUID, serviceOfferingGUID, servicePlanGUID, uuid.NewString(), nil, domain.PreviousValues{}, nil) - Expect(response.Error).NotTo(HaveOccurred()) - Expect(response.StatusCode).To(Equal(http.StatusAccepted)) + By("starting to update") + response := broker.Client.Update(instance.GUID, serviceOfferingGUID, servicePlanGUID, uuid.NewString(), nil, domain.PreviousValues{}, nil) + Expect(response.Error).NotTo(HaveOccurred()) + Expect(response.StatusCode).To(Equal(http.StatusAccepted)) - By("terminating and restarting the broker") - Expect(broker.Stop()).To(Succeed()) - broker = must(testdrive.StartBroker(csb, brokerpak, database, testdrive.WithOutputs(stdout, stderr))) + By("terminating and restarting the broker") + Expect(broker.Terminate()).To(Succeed()) + broker = must(testdrive.StartBroker(csb, brokerpak, database, testdrive.WithOutputs(stdout, stderr), testdrive.WithEnv("CF_INSTANCE_GUID=dcfa061e-c0e3-4237-a805-734578347393"))) - By("reporting that an operation failed") - lastOperation, err := broker.LastOperation(instance.GUID) - Expect(err).NotTo(HaveOccurred()) - Expect(lastOperation.Description).To(Equal("the broker restarted while the operation was in progress")) - Expect(lastOperation.State).To(BeEquivalentTo("failed")) + By("reporting that an operation failed") + lastOperation, err := broker.LastOperation(instance.GUID) + Expect(err).NotTo(HaveOccurred()) + Expect(lastOperation.Description).To(Equal("the broker restarted while the operation was in progress")) + Expect(lastOperation.State).To(BeEquivalentTo("failed")) - By("logging a message") - ws := fmt.Sprintf(`"workspace_id":"tf:%s:"`, instance.GUID) - Expect(string(stdout.Contents())).To(SatisfyAll(ContainSubstring("recover-in-progress-operations.mark-as-failed"), ContainSubstring(ws))) + By("logging a message") + ws := fmt.Sprintf(`"workspace_id":"tf:%s:"`, instance.GUID) + Expect(string(stdout.Contents())).To(SatisfyAll(ContainSubstring("recover-in-progress-operations.mark-as-failed"), ContainSubstring(ws))) - By("allowing the operation to be restarted") - Expect(broker.UpdateService(instance)).To(Succeed()) - }) + By("allowing the operation to be restarted") + Expect(broker.UpdateService(instance)).To(Succeed()) + }) - It("can recover from a terminated delete", func() { - By("successfully provisioning a service instance") - instance, err := broker.Provision(serviceOfferingGUID, servicePlanGUID) - Expect(err).NotTo(HaveOccurred()) + It("can recover from a terminated delete", func() { + By("successfully provisioning a service instance") + instance, err := broker.Provision(serviceOfferingGUID, servicePlanGUID) + Expect(err).NotTo(HaveOccurred()) - By("starting to delete") - response := broker.Client.Deprovision(instance.GUID, serviceOfferingGUID, servicePlanGUID, uuid.NewString()) - Expect(response.Error).NotTo(HaveOccurred()) - Expect(response.StatusCode).To(Equal(http.StatusAccepted)) + By("starting to delete") + response := broker.Client.Deprovision(instance.GUID, serviceOfferingGUID, servicePlanGUID, uuid.NewString()) + Expect(response.Error).NotTo(HaveOccurred()) + Expect(response.StatusCode).To(Equal(http.StatusAccepted)) - By("terminating and restarting the broker") - Expect(broker.Stop()).To(Succeed()) - broker = must(testdrive.StartBroker(csb, brokerpak, database, testdrive.WithOutputs(stdout, stderr))) + By("terminating and restarting the broker") + Expect(broker.Terminate()).To(Succeed()) + broker = must(testdrive.StartBroker(csb, brokerpak, database, testdrive.WithOutputs(stdout, stderr), testdrive.WithEnv("CF_INSTANCE_GUID=dcfa061e-c0e3-4237-a805-734578347393"))) - By("reporting that an operation failed") - lastOperation, err := broker.LastOperation(instance.GUID) - Expect(err).NotTo(HaveOccurred()) - Expect(lastOperation.Description).To(Equal("the broker restarted while the operation was in progress")) - Expect(lastOperation.State).To(BeEquivalentTo("failed")) + By("reporting that an operation failed") + lastOperation, err := broker.LastOperation(instance.GUID) + Expect(err).NotTo(HaveOccurred()) + Expect(lastOperation.Description).To(Equal("the broker restarted while the operation was in progress")) + Expect(lastOperation.State).To(BeEquivalentTo("failed")) - By("logging a message") - ws := fmt.Sprintf(`"workspace_id":"tf:%s:"`, instance.GUID) - Expect(string(stdout.Contents())).To(SatisfyAll(ContainSubstring("recover-in-progress-operations.mark-as-failed"), ContainSubstring(ws))) + By("logging a message") + ws := fmt.Sprintf(`"workspace_id":"tf:%s:"`, instance.GUID) + Expect(string(stdout.Contents())).To(SatisfyAll(ContainSubstring("recover-in-progress-operations.mark-as-failed"), ContainSubstring(ws))) - By("allowing the operation to be restarted") - Expect(broker.Deprovision(instance)).To(Succeed()) - }) + By("allowing the operation to be restarted") + Expect(broker.Deprovision(instance)).To(Succeed()) + }) - It("can recover from a terminated bind", func() { - By("successfully provisioning a service instance") - instance, err := broker.Provision(serviceOfferingGUID, servicePlanGUID) - Expect(err).NotTo(HaveOccurred()) + It("can recover from a terminated bind", func() { + By("successfully provisioning a service instance") + instance, err := broker.Provision(serviceOfferingGUID, servicePlanGUID) + Expect(err).NotTo(HaveOccurred()) - By("starting to bind") - bindingGUID := uuid.NewString() - go broker.CreateBinding(instance, testdrive.WithBindingGUID(bindingGUID)) + By("starting to bind") + bindingGUID := uuid.NewString() + go broker.CreateBinding(instance, testdrive.WithBindingGUID(bindingGUID)) - Eventually(stdout).Should(Say(fmt.Sprintf(`"cloud-service-broker.Binding".*"binding_id":"%s"`, bindingGUID))) + Eventually(stdout).Should(Say(fmt.Sprintf(`"cloud-service-broker.Binding".*"binding_id":"%s"`, bindingGUID))) - By("terminating and restarting the broker") - Expect(broker.Stop()).To(Succeed()) - broker = must(testdrive.StartBroker(csb, brokerpak, database, testdrive.WithOutputs(stdout, stderr))) + By("terminating and restarting the broker") + Expect(broker.Terminate()).To(Succeed()) + broker = must(testdrive.StartBroker(csb, brokerpak, database, testdrive.WithOutputs(stdout, stderr), testdrive.WithEnv("CF_INSTANCE_GUID=dcfa061e-c0e3-4237-a805-734578347393"))) - By("allowing the operation to be restarted") - _, err = broker.CreateBinding(instance, testdrive.WithBindingGUID(bindingGUID)) - Expect(err).NotTo(HaveOccurred()) - }) + By("allowing the operation to be restarted") + _, err = broker.CreateBinding(instance, testdrive.WithBindingGUID(bindingGUID)) + Expect(err).NotTo(HaveOccurred()) + }) - It("can recover from a terminated unbind", func() { - By("successfully provisioning a service instance and binding") - instance, err := broker.Provision(serviceOfferingGUID, servicePlanGUID) - Expect(err).NotTo(HaveOccurred()) + It("can recover from a terminated unbind", func() { + By("successfully provisioning a service instance and binding") + instance, err := broker.Provision(serviceOfferingGUID, servicePlanGUID) + Expect(err).NotTo(HaveOccurred()) - bindingGUID := uuid.NewString() - _, err = broker.CreateBinding(instance, testdrive.WithBindingGUID(bindingGUID)) - Expect(err).NotTo(HaveOccurred()) + bindingGUID := uuid.NewString() + _, err = broker.CreateBinding(instance, testdrive.WithBindingGUID(bindingGUID)) + Expect(err).NotTo(HaveOccurred()) - By("starting to unbind") - go broker.DeleteBinding(instance, bindingGUID) + By("starting to unbind") + go broker.DeleteBinding(instance, bindingGUID) - Eventually(stdout).Should(Say(fmt.Sprintf(`"cloud-service-broker.Unbinding".*"binding_id":"%s"`, bindingGUID))) + Eventually(stdout).Should(Say(fmt.Sprintf(`"cloud-service-broker.Unbinding".*"binding_id":"%s"`, bindingGUID))) - By("terminating and restarting the broker") - Expect(broker.Stop()).To(Succeed()) - broker = must(testdrive.StartBroker(csb, brokerpak, database, testdrive.WithOutputs(stdout, stderr))) + By("terminating and restarting the broker") + Expect(broker.Terminate()).To(Succeed()) + broker = must(testdrive.StartBroker(csb, brokerpak, database, testdrive.WithOutputs(stdout, stderr), testdrive.WithEnv("CF_INSTANCE_GUID=dcfa061e-c0e3-4237-a805-734578347393"))) - By("allowing the operation to be restarted") - Expect(broker.DeleteBinding(instance, bindingGUID)).To(Succeed()) + By("allowing the operation to be restarted") + Expect(broker.DeleteBinding(instance, bindingGUID)).To(Succeed()) + }) }) }) diff --git a/internal/storage/storage.go b/internal/storage/storage.go index 96a992208..5efc7c53c 100644 --- a/internal/storage/storage.go +++ b/internal/storage/storage.go @@ -1,17 +1,31 @@ // Package storage implements a Database Access Object (DAO) package storage -import "gorm.io/gorm" +import ( + "os" + + "gorm.io/gorm" +) type Storage struct { - db *gorm.DB - encryptor Encryptor + db *gorm.DB + encryptor Encryptor + lockFileDir string } func New(db *gorm.DB, encryptor Encryptor) *Storage { + // the VM based HA deployment requires a drain mechanism. LockFiles are a simple solution. + // but not every environment will opt for using VM based deployments. So detect if the lockfile + // director is present. + dirDefault := "/var/vcap/data/csb-app-locks" + if _, err := os.Stat(dirDefault); err != nil { + os.Mkdir("/tmp/csb", 0o755) + dirDefault = "/tmp/csb" + } return &Storage{ - db: db, - encryptor: encryptor, + db: db, + encryptor: encryptor, + lockFileDir: dirDefault, } } diff --git a/internal/storage/terraform_deployment.go b/internal/storage/terraform_deployment.go index 9d6e022aa..b733cab87 100644 --- a/internal/storage/terraform_deployment.go +++ b/internal/storage/terraform_deployment.go @@ -2,6 +2,8 @@ package storage import ( "fmt" + "os" + "strings" "time" "github.com/hashicorp/go-version" @@ -156,3 +158,20 @@ func (s *Storage) loadTerraformDeploymentIfExists(id string, receiver any) error return s.db.Where("id = ?", id).First(receiver).Error } + +func (s *Storage) LockFilesExist() bool { + entries, _ := os.ReadDir(s.lockFileDir) + return len(entries) == 0 +} + +func (s *Storage) WriteLockFile(deploymentID string) error { + return os.WriteFile(fmt.Sprintf("%s/%s", s.lockFileDir, sanitizeFileName(deploymentID)), []byte{}, 0o644) +} + +func (s *Storage) RemoveLockFile(deploymentID string) error { + return os.Remove(fmt.Sprintf("%s/%s", s.lockFileDir, sanitizeFileName(deploymentID))) +} + +func sanitizeFileName(name string) string { + return strings.ReplaceAll(name, ":", "_") +} diff --git a/internal/testdrive/broker.go b/internal/testdrive/broker.go index 416d70deb..f2259483b 100644 --- a/internal/testdrive/broker.go +++ b/internal/testdrive/broker.go @@ -23,6 +23,15 @@ func (b *Broker) Stop() error { case b == nil, b.runner == nil: return nil default: - return b.runner.stop() + return b.runner.gracefullStop() + } +} + +func (b *Broker) Terminate() error { + switch { + case b == nil, b.runner == nil: + return nil + default: + return b.runner.forceStop() } } diff --git a/internal/testdrive/broker_start.go b/internal/testdrive/broker_start.go index 97172d46b..77acd13c9 100644 --- a/internal/testdrive/broker_start.go +++ b/internal/testdrive/broker_start.go @@ -108,7 +108,7 @@ func StartBroker(csbPath, bpk, db string, opts ...StartBrokerOption) (*Broker, e case err == nil && response.StatusCode == http.StatusOK: return &broker, nil case time.Since(start) > time.Minute: - if err := broker.runner.stop(); err != nil { + if err := broker.runner.forceStop(); err != nil { return nil, err } return nil, fmt.Errorf("timed out after %s waiting for broker to start: %s\n%s", time.Since(start), stdout.String(), stderr.String()) @@ -199,9 +199,9 @@ func WithTLSConfig(isValid bool) StartBrokerOption { serverPrivKey[10] = 'a' } - Expect(os.WriteFile(privKeyFileBuf.Name(), serverPrivKey, 0644)).To(Succeed()) + Expect(os.WriteFile(privKeyFileBuf.Name(), serverPrivKey, 0o644)).To(Succeed()) - Expect(os.WriteFile(certFileBuf.Name(), serverCert, 0644)).To(Succeed()) + Expect(os.WriteFile(certFileBuf.Name(), serverCert, 0o644)).To(Succeed()) cfg.env = append(cfg.env, fmt.Sprintf("TLS_CERT_CHAIN=%s", certFileBuf.Name())) cfg.env = append(cfg.env, fmt.Sprintf("TLS_PRIVATE_KEY=%s", privKeyFileBuf.Name())) diff --git a/internal/testdrive/runner.go b/internal/testdrive/runner.go index 1d4736e3b..68aa89481 100644 --- a/internal/testdrive/runner.go +++ b/internal/testdrive/runner.go @@ -2,6 +2,7 @@ package testdrive import ( "os/exec" + "syscall" "time" ) @@ -26,12 +27,29 @@ func (r *runner) error(err error) *runner { return r } -func (r *runner) stop() error { +func (r *runner) gracefullStop() error { if r.exited { return nil } if r.cmd != nil && r.cmd.Process != nil { - if err := r.cmd.Process.Kill(); err != nil { + if err := r.cmd.Process.Signal(syscall.SIGTERM); err != nil { + return err + } + } + + for !r.exited { + time.Sleep(time.Millisecond) + } + + return nil +} + +func (r *runner) forceStop() error { + if r.exited { + return nil + } + if r.cmd != nil && r.cmd.Process != nil { + if err := r.cmd.Process.Signal(syscall.SIGKILL); err != nil { return err } } diff --git a/pkg/broker/service_provider.go b/pkg/broker/service_provider.go index 1cd6ad535..f28cc0758 100644 --- a/pkg/broker/service_provider.go +++ b/pkg/broker/service_provider.go @@ -77,4 +77,6 @@ type ServiceProviderStorage interface { DeleteTerraformDeployment(id string) error ExistsTerraformDeployment(id string) (bool, error) GetServiceBindingIDsForServiceInstance(serviceInstanceID string) ([]string, error) + WriteLockFile(guid string) error + RemoveLockFile(guid string) error } diff --git a/pkg/providers/tf/deployment_manager.go b/pkg/providers/tf/deployment_manager.go index 188581ec5..19247b4e8 100644 --- a/pkg/providers/tf/deployment_manager.go +++ b/pkg/providers/tf/deployment_manager.go @@ -51,7 +51,7 @@ func (d *DeploymentManager) MarkOperationStarted(deployment *storage.TerraformDe return err } - return nil + return d.store.WriteLockFile(deployment.ID) } func (d *DeploymentManager) MarkOperationFinished(deployment *storage.TerraformDeployment, err error) error { @@ -74,8 +74,15 @@ func (d *DeploymentManager) MarkOperationFinished(deployment *storage.TerraformD }) } + err = d.store.StoreTerraformDeployment(*deployment) + if err != nil { + d.logger.Error("store-state", err, lager.Data{ + "deploymentID": deployment.ID, + "message": deployment.LastOperationMessage, + }) + } - return d.store.StoreTerraformDeployment(*deployment) + return d.store.RemoveLockFile(deployment.ID) } func (d *DeploymentManager) OperationStatus(deploymentID string) (bool, string, error) { From ff7aa6ee2698d2266710d60a3685736da4d127be Mon Sep 17 00:00:00 2001 From: ifindlay-cci Date: Wed, 4 Sep 2024 16:18:59 +0100 Subject: [PATCH 4/7] feat: cleaned up lock files before each test run * extended inflight operation test to check deprovision * removed focus test * removed failing check for SIGTERM, in this case SIGKILL is sent - but not seen in log --- integrationtest/termination_recovery_test.go | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/integrationtest/termination_recovery_test.go b/integrationtest/termination_recovery_test.go index c808535e0..c54c082dd 100644 --- a/integrationtest/termination_recovery_test.go +++ b/integrationtest/termination_recovery_test.go @@ -3,6 +3,7 @@ package integrationtest_test import ( "fmt" "net/http" + "os" "time" "github.com/cloudfoundry/cloud-service-broker/v2/integrationtest/packer" @@ -33,6 +34,7 @@ var _ = Describe("Recovery From Broker Termination", func() { stdout = gbytes.NewBuffer() stderr = gbytes.NewBuffer() + os.RemoveAll("/tmp/csb/") broker = must(testdrive.StartBroker(csb, brokerpak, database, testdrive.WithOutputs(stdout, stderr))) DeferCleanup(func() { @@ -40,7 +42,7 @@ var _ = Describe("Recovery From Broker Termination", func() { cleanup(brokerpak) }) }) - FIt("can can finish the in flight operation", func() { + It("can finish the in flight operation", func() { By("starting to provision") instanceGUID := uuid.NewString() response := broker.Client.Provision(instanceGUID, serviceOfferingGUID, servicePlanGUID, uuid.NewString(), nil) @@ -63,13 +65,15 @@ var _ = Describe("Recovery From Broker Termination", func() { broker = must(testdrive.StartBroker(csb, brokerpak, database, testdrive.WithOutputs(stdout, stderr))) - By("checking that the resource finished succesfully") + By("checking that the resource finished successfully") response = broker.Client.LastOperation(instanceGUID, uuid.NewString()) Expect(string(response.ResponseBody)).To(ContainSubstring(`{"state":"succeeded","description":"provision succeeded"}`)) Expect(response.Error).NotTo(HaveOccurred()) Expect(response.StatusCode).To(Equal(http.StatusOK)) - By("ensuring SI can be succesfully deleted") + By("ensuring SI can be successfully deleted") + si := testdrive.ServiceInstance{GUID: instanceGUID, ServiceOfferingGUID: serviceOfferingGUID, ServicePlanGUID: servicePlanGUID} + Expect(broker.Deprovision(si)).To(Succeed()) }) }) Describe("running csb as a CF app", func() { @@ -78,6 +82,7 @@ var _ = Describe("Recovery From Broker Termination", func() { stdout = NewBuffer() stderr = NewBuffer() + os.RemoveAll("/tmp/csb/") broker = must(testdrive.StartBroker(csb, brokerpak, database, testdrive.WithOutputs(stdout, stderr), testdrive.WithEnv("CF_INSTANCE_GUID=dcfa061e-c0e3-4237-a805-734578347393"))) DeferCleanup(func() { @@ -105,7 +110,7 @@ var _ = Describe("Recovery From Broker Termination", func() { By("logging a message") ws := fmt.Sprintf(`"workspace_id":"tf:%s:"`, instanceGUID) - Expect(string(stdout.Contents())).To(SatisfyAll(ContainSubstring("received SIGKILL"), ContainSubstring("recover-in-progress-operations.mark-as-failed"), ContainSubstring(ws))) + Expect(string(stdout.Contents())).To(SatisfyAll(ContainSubstring("recover-in-progress-operations.mark-as-failed"), ContainSubstring(ws))) // OSBAPI requires that HTTP 409 (Conflict) is returned By("refusing to allow a duplicate instance") From 75c1ca17533dc54bd0373cf672bdad3b47a35ee0 Mon Sep 17 00:00:00 2001 From: ifindlay-cci Date: Thu, 5 Sep 2024 08:36:27 +0100 Subject: [PATCH 5/7] ifeat: added unittests for LockFilesExists --- internal/storage/terraform_deployment.go | 2 +- internal/storage/terraform_deployment_test.go | 17 +++++++++++++++++ 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/internal/storage/terraform_deployment.go b/internal/storage/terraform_deployment.go index b733cab87..1dc9c28bd 100644 --- a/internal/storage/terraform_deployment.go +++ b/internal/storage/terraform_deployment.go @@ -161,7 +161,7 @@ func (s *Storage) loadTerraformDeploymentIfExists(id string, receiver any) error func (s *Storage) LockFilesExist() bool { entries, _ := os.ReadDir(s.lockFileDir) - return len(entries) == 0 + return len(entries) != 0 } func (s *Storage) WriteLockFile(deploymentID string) error { diff --git a/internal/storage/terraform_deployment_test.go b/internal/storage/terraform_deployment_test.go index 1dfe0ada9..97af106ce 100644 --- a/internal/storage/terraform_deployment_test.go +++ b/internal/storage/terraform_deployment_test.go @@ -206,6 +206,23 @@ var _ = Describe("TerraformDeployments", func() { Expect(store.DeleteTerraformDeployment("not-there")).NotTo(HaveOccurred()) }) }) + + Describe("LockFileExists", func() { + FIt("reports correct status", func() { + Expect(store.WriteLockFile("1234")).To(Succeed()) + Expect(store.WriteLockFile("5678")).To(Succeed()) + + Expect(store.LockFilesExist()).To(BeTrue()) + + Expect(store.RemoveLockFile("1234")).To(Succeed()) + + Expect(store.LockFilesExist()).To(BeTrue()) + + Expect(store.RemoveLockFile("5678")).To(Succeed()) + + Expect(store.LockFilesExist()).To(BeFalse()) + }) + }) }) func addFakeTerraformDeployments() { From cc6358203b7822943549fec8c57a3142af85e3f4 Mon Sep 17 00:00:00 2001 From: ifindlay-cci Date: Thu, 5 Sep 2024 09:01:46 +0100 Subject: [PATCH 6/7] bug: drain wait not working as expected A previous commit fixed an issue with LockFilesExist returning an inverted value. The existing drain wait code depended on this incorrect behaviour. --- cmd/serve.go | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/cmd/serve.go b/cmd/serve.go index 0557a8918..74b005698 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -257,12 +257,9 @@ func startServer(registry pakBroker.BrokerRegistry, db *sql.DB, brokerapi http.H shutdownCtx, shutdownRelease := context.WithTimeout(context.Background(), shutdownTimeout) defer shutdownRelease() - for { - if store.LockFilesExist() { - logger.Info("draining csb instance") - time.Sleep(time.Second * 1) - break - } + for store.LockFilesExist() { + logger.Info("draining csb instance") + time.Sleep(time.Second * 1) } logger.Info("draining complete") if err := httpServer.Shutdown(shutdownCtx); err != nil { From 7d80274f1826d7af590d33f447eeb31fc1e43881 Mon Sep 17 00:00:00 2001 From: Iain Findlay Date: Thu, 5 Sep 2024 17:26:28 +0200 Subject: [PATCH 7/7] we should block in shutdown after shutdown was called I O U a proper message --- cmd/serve.go | 10 ++++------ integrationtest/termination_recovery_test.go | 15 +++++++++------ 2 files changed, 13 insertions(+), 12 deletions(-) diff --git a/cmd/serve.go b/cmd/serve.go index 74b005698..70c40b5d8 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -253,19 +253,17 @@ func startServer(registry pakBroker.BrokerRegistry, db *sql.DB, brokerapi http.H switch signalReceived { case syscall.SIGTERM: - logger.Info("received SIGTERM, server is shutting down gracefully allowing for in flight work to finish") - shutdownCtx, shutdownRelease := context.WithTimeout(context.Background(), shutdownTimeout) + if err := httpServer.Shutdown(shutdownCtx); err != nil { + logger.Fatal("shutdown error: %v", err) + } + logger.Info("received SIGTERM, server is shutting down gracefully allowing for in flight work to finish") defer shutdownRelease() for store.LockFilesExist() { logger.Info("draining csb instance") time.Sleep(time.Second * 1) } logger.Info("draining complete") - if err := httpServer.Shutdown(shutdownCtx); err != nil { - logger.Fatal("shutdown error: %v", err) - } - logger.Info("shutdown complete") case syscall.SIGKILL: logger.Info("received SIGKILL, server is shutting down immediately. In flight operations will not finish and their state is potentially lost.") diff --git a/integrationtest/termination_recovery_test.go b/integrationtest/termination_recovery_test.go index c54c082dd..6eb822170 100644 --- a/integrationtest/termination_recovery_test.go +++ b/integrationtest/termination_recovery_test.go @@ -51,17 +51,20 @@ var _ = Describe("Recovery From Broker Termination", func() { Expect(response.StatusCode).To(Equal(http.StatusAccepted)) Eventually(stdout, time.Second*5).Should(gbytes.Say(`tofu","apply","-auto-approve"`)) By("gracefully stopping the broker") + // Stop seems to be blocking, so run it in a routine so we can check that the broker actually rejects requests until it's fully stopped. + go func() { + Expect(broker.Stop()).To(Succeed()) + }() - Expect(broker.Stop()).To(Succeed()) + By("logging a message") + Eventually(stdout).Should(gbytes.Say("received SIGTERM")) By("ensuring that the broker rejects requests") Expect(broker.Client.LastOperation(instanceGUID, uuid.NewString()).Error).To(HaveOccurred()) - By("logging a message") - Expect(string(stdout.Contents())).To(ContainSubstring("received SIGTERM")) - Expect(string(stdout.Contents())).To(ContainSubstring("draining csb")) - Expect(string(stdout.Contents())).To(ContainSubstring("draining complete")) - Expect(string(stdout.Contents())).ToNot(ContainSubstring("shutdown error")) + Eventually(stdout, time.Minute).Should(Say("draining csb")) + Eventually(stdout, time.Minute).Should(Say("draining complete")) + Eventually(stdout, time.Minute).ShouldNot(Say("shutdown error")) broker = must(testdrive.StartBroker(csb, brokerpak, database, testdrive.WithOutputs(stdout, stderr)))