Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: enable drain and running on vm #1097

Merged
merged 6 commits into from
Sep 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 148 additions & 0 deletions brokerapi/broker/brokerfakes/fake_storage.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

79 changes: 61 additions & 18 deletions cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@ import (
"io"
"log/slog"
"net/http"
"os"
"os/signal"
"strings"
"syscall"
"time"

"code.cloudfoundry.org/lager/v3"
osbapiBroker "github.com/cloudfoundry/cloud-service-broker/v2/brokerapi/broker"
Expand Down Expand Up @@ -55,6 +59,8 @@ const (
tlsKeyProp = "api.tlsKey"
encryptionPasswords = "db.encryption.passwords"
encryptionEnabled = "db.encryption.enabled"

shutdownTimeout = time.Hour
)

var cfCompatibilityToggle = toggles.Features.Toggle("enable-cf-sharing", false, `Set all services to have the Sharable flag so they can be shared
Expand Down Expand Up @@ -102,7 +108,13 @@ func serve() {
logger.Fatal("Error initializing service broker config", err)
}
var serviceBroker domain.ServiceBroker
serviceBroker, err = osbapiBroker.New(cfg, storage.New(db, encryptor), logger)
csbStore := storage.New(db, encryptor)
err = csbStore.RecoverInProgressOperations(logger)
if err != nil {
logger.Fatal("Error recovering in-progress operations", err)
}

serviceBroker, err = osbapiBroker.New(cfg, csbStore, logger)
if err != nil {
logger.Fatal("Error initializing service broker", err)
}
Expand Down Expand Up @@ -133,7 +145,9 @@ func serve() {
if err != nil {
logger.Error("failed to get database connection", err)
}
startServer(cfg.Registry, sqldb, brokerAPI, storage.New(db, encryptor), credentials)
httpServer := startServer(cfg.Registry, sqldb, brokerAPI, csbStore, credentials)

listenForShutdownSignal(httpServer, logger, csbStore)
}

func serveDocs() {
Expand Down Expand Up @@ -188,7 +202,7 @@ func setupDBEncryption(db *gorm.DB, logger lager.Logger) storage.Encryptor {
return config.Encryptor
}

func startServer(registry pakBroker.BrokerRegistry, db *sql.DB, brokerapi http.Handler, store *storage.Storage, credentials brokerapi.BrokerCredentials) {
func startServer(registry pakBroker.BrokerRegistry, db *sql.DB, brokerapi http.Handler, store *storage.Storage, credentials brokerapi.BrokerCredentials) *http.Server {
logger := utils.NewLogger("cloud-service-broker")

docsHandler := server.DocsHandler(registry)
Expand Down Expand Up @@ -225,24 +239,44 @@ func startServer(registry pakBroker.BrokerRegistry, db *sql.DB, brokerapi http.H
Addr: fmt.Sprintf("%s:%s", host, port),
Handler: router,
}
var err error
if tlsCertCaBundleFilePath != "" && tlsKeyFilePath != "" {
err = httpServer.ListenAndServeTLS(tlsCertCaBundleFilePath, tlsKeyFilePath)
} else {
err = httpServer.ListenAndServe()
}
// when the server is receiving a signal, we probably do not want to panic.
if err != http.ErrServerClosed {
logger.Fatal("Failed to start broker", err)
}
go func() {
var err error
if tlsCertCaBundleFilePath != "" && tlsKeyFilePath != "" {
err = httpServer.ListenAndServeTLS(tlsCertCaBundleFilePath, tlsKeyFilePath)
} else {
err = httpServer.ListenAndServe()
}
if err == http.ErrServerClosed {
logger.Info("shutting down csb")
} else {
logger.Fatal("Failed to start broker", err)
}
}()
return httpServer
}

func labelName(label string) string {
switch label {
case "":
return "none"
func listenForShutdownSignal(httpServer *http.Server, logger lager.Logger, store *storage.Storage) {
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGTERM)

signalReceived := <-sigChan

switch signalReceived {

case syscall.SIGTERM:
shutdownCtx, shutdownRelease := context.WithTimeout(context.Background(), shutdownTimeout)
if err := httpServer.Shutdown(shutdownCtx); err != nil {
logger.Fatal("shutdown error: %v", err)
}
logger.Info("received SIGTERM, server is shutting down gracefully allowing for in flight work to finish")
defer shutdownRelease()
for store.LockFilesExist() {
logger.Info("draining csb in progress")
time.Sleep(time.Second * 1)
}
logger.Info("draining complete")
default:
return label
logger.Info(fmt.Sprintf("csb does not handle the %s interrupt signal", signalReceived))
}
}

Expand Down Expand Up @@ -288,3 +322,12 @@ func importStateHandler(store *storage.Storage) http.Handler {
}
})
}

func labelName(label string) string {
switch label {
case "":
return "none"
default:
return label
}
}
3 changes: 0 additions & 3 deletions dbservice/dbservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,6 @@ func NewWithMigrations(logger lager.Logger) *gorm.DB {
if err := RunMigrations(db); err != nil {
panic(fmt.Sprintf("Error migrating database: %s", err))
}
if err := recoverInProgressOperations(db, logger); err != nil {
panic(fmt.Sprintf("Error recovering in-progress operations: %s", err))
}
})
return db
}
24 changes: 0 additions & 24 deletions dbservice/recover_in_progress_operations.go

This file was deleted.

Loading
Loading