diff --git a/Dockerfile b/Dockerfile index 0e1867e95..1841ab3dc 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,8 +1,8 @@ # This Dockerfile builds the InterUSS `dss` image which contains the binary -# executables for the core-service and the db-manager. It also +# executables for the core-service, the db-manager and the db-evictor. It also # contains a light weight tool that provides debugging capability. To run a # container for this image, the desired binary must be specified (either -# /usr/bin/core-service or /usr/bin/db-manager). +# /usr/bin/core-service, /usr/bin/db-manager, or /usr/bin/db-evictor). FROM golang:1.22-alpine AS build RUN apk add build-base @@ -27,6 +27,7 @@ FROM alpine:latest RUN apk update && apk add ca-certificates COPY --from=build /go/bin/core-service /usr/bin COPY --from=build /go/bin/db-manager /usr/bin +COPY --from=build /go/bin/db-evictor /usr/bin COPY --from=build /go/bin/dlv /usr/bin COPY build/jwt-public-certs /jwt-public-certs COPY build/test-certs /test-certs diff --git a/cmds/db-evictor/main.go b/cmds/db-evictor/main.go new file mode 100644 index 000000000..06a96ed72 --- /dev/null +++ b/cmds/db-evictor/main.go @@ -0,0 +1,107 @@ +package main + +import ( + "context" + "flag" + "fmt" + "log" + "time" + + "github.com/interuss/dss/pkg/cockroach" + "github.com/interuss/dss/pkg/cockroach/flags" + dssmodels "github.com/interuss/dss/pkg/models" + scdmodels "github.com/interuss/dss/pkg/scd/models" + "github.com/interuss/dss/pkg/scd/repos" + scdc "github.com/interuss/dss/pkg/scd/store/cockroach" +) + +var ( + listOpIntents = flag.Bool("op_intents", true, "set this flag to true to list expired operational intents") + listScdSubs = flag.Bool("scd_subs", true, "set this flag to true to list expired SCD subscriptions") + ttl = flag.Duration("ttl", time.Hour*24*112, "time-to-live duration used for determining expiration") + deleteExpired = flag.Bool("delete", false, "set this flag to true to delete the expired entities") +) + +func main() { + flag.Parse() + + var ( + ctx = context.Background() + threshold = time.Now().Add(-*ttl) + ) + + connectParameters := flags.ConnectParameters() + connectParameters.ApplicationName = "db-evictor" + connectParameters.DBName = scdc.DatabaseName + scdCrdb, err := cockroach.Dial(ctx, connectParameters) + if err != nil { + log.Panicf("Failed to connect to database with %+v: %v", connectParameters, err) + } + + scdStore, err := scdc.NewStore(ctx, scdCrdb) + if err != nil { + log.Panicf("Failed to create strategic conflict detection store with %+v: %v", connectParameters, err) + } + + var ( + expiredOpIntents []*scdmodels.OperationalIntent + expiredSubs []*scdmodels.Subscription + ) + action := func(ctx context.Context, r repos.Repository) (err error) { + if *listOpIntents { + expiredOpIntents, err = r.ListExpiredOperationalIntents(ctx, threshold) + if err != nil { + return fmt.Errorf("listing expired operational intents: %w", err) + } + if *deleteExpired { + for _, opIntent := range expiredOpIntents { + if err = r.DeleteOperationalIntent(ctx, opIntent.ID); err != nil { + return fmt.Errorf("deleting expired operational intents: %w", err) + } + } + } + } + + if *listScdSubs { + expiredSubs, err = r.ListExpiredSubscriptions(ctx, threshold) + if err != nil { + return fmt.Errorf("listing expired subscriptions: %w", err) + } + if *deleteExpired { + for _, sub := range expiredSubs { + if err = r.DeleteSubscription(ctx, sub.ID); err != nil { + return fmt.Errorf("deleting expired subscriptions: %w", err) + } + } + } + } + + return nil + } + if err = scdStore.Transact(ctx, action); err != nil { + log.Panicf("Failed to execute CRDB transaction: %v", err) + } + + for _, opIntent := range expiredOpIntents { + logExpiredEntity("operational intent", opIntent.ID, threshold, *deleteExpired, opIntent.EndTime != nil) + } + for _, sub := range expiredSubs { + logExpiredEntity("subscription", sub.ID, threshold, *deleteExpired, sub.EndTime != nil) + } + if len(expiredOpIntents) == 0 && len(expiredSubs) == 0 { + log.Printf("no entity older than %s found", threshold.String()) + } +} + +func logExpiredEntity(entity string, entityID dssmodels.ID, threshold time.Time, deleted, hasEndTime bool) { + logMsg := "found" + if deleted { + logMsg = "deleted" + } + + expMsg := "last update before %s (missing end time)" + if hasEndTime { + expMsg = "end time before %s" + } + log.Printf("%s %s %s; expired due to %s", logMsg, entity, entityID.String(), fmt.Sprintf(expMsg, threshold.String())) +} diff --git a/cmds/db-manager/cleanup/README.md b/cmds/db-manager/cleanup/README.md new file mode 100644 index 000000000..af8112dd9 --- /dev/null +++ b/cmds/db-manager/cleanup/README.md @@ -0,0 +1,67 @@ +# DB Cleanup + +## scd-evict +CLI tool that lists and deletes expired entities in the DSS store. +At the time of writing this README, the entities supported by this tool are: +- SCD operational intents; +- SCD subscriptions. + +The usage of this tool is potentially dangerous: inputting wrong parameters may result in loss of data. +As such it is strongly recommended to always review and validate the list of entities identified as expired, and to +ensure that a backup of the data is available before deleting anything using the `-delete` flag + +### Usage +Extract from running `db-manager scd-evict --help`: +``` +List and evict SCD expired entities + +Usage: + db-manager scd-evict [flags] + +Flags: + --delete set this flag to true to delete the expired entities + -h, --help help for scd-evict + --op_intents set this flag to true to list expired operational intents (default true) + --scd_subs set this flag to true to list expired SCD subscriptions (default true) + --ttl duration time-to-live duration used for determining expiration, defaults to 2*56 days which should be a safe value in most cases (default 2688h0m0s) + +Global Flags: + --cockroach_application_name string application name for tagging the connection to cockroach (default "dss") + --cockroach_db_name string application name for tagging the connection to cockroach (default "dss") + --cockroach_host string cockroach host to connect to + --cockroach_max_retries int maximum number of attempts to retry a query in case of contention, default is 100 (default 100) + --cockroach_port int cockroach port to connect to (default 26257) + --cockroach_ssl_dir string directory to ssl certificates. Must contain files: ca.crt, client..crt, client..key + --cockroach_ssl_mode string cockroach sslmode (default "disable") + --cockroach_user string cockroach user to authenticate as (default "root") + --max_conn_idle_secs int maximum amount of time in seconds a connection may be idle, default is 30 seconds (default 30) + --max_open_conns int maximum number of open connections to the database, default is 4 (default 4) + +``` + +Do note: +- by default expired entities are only listed, not deleted, the flag `-delete` is required for deleting entities; +- expiration of entities is preferably determined through their end times, however when they do not have end times, the last update times are used; +- the flag `-ttl` accepts durations formatted as [Go `time.Duration` strings](https://pkg.go.dev/time#ParseDuration), e.g. `24h`; +- the CockroachDB cluster connection flags are the same than [the `core-service` command](../core-service/README.md). + +## Examples +The following examples assume a running DSS deployed locally through [the `run_locally.sh` script](../../build/dev/standalone_instance.md). + +### List all entities older than 1 week +```shell +docker compose -f docker-compose_dss.yaml -p dss_sandbox exec local-dss-core-service db-evictor \ + -cockroach_host=local-dss-crdb -ttl=168h +``` + +### List operational intents older than 1 week +```shell +docker compose -f docker-compose_dss.yaml -p dss_sandbox exec local-dss-core-service db-evictor \ + -cockroach_host=local-dss-crdb -ttl=168h -op_intents=true -scd_subs=false +``` + +### Delete all entities older than 30 days +```shell +docker compose -f docker-compose_dss.yaml -p dss_sandbox exec local-dss-core-service db-evictor \ + -cockroach_host=local-dss-crdb -ttl=720h -delete +``` diff --git a/cmds/db-manager/cleanup/scd-evict.go b/cmds/db-manager/cleanup/scd-evict.go new file mode 100644 index 000000000..8fcd84500 --- /dev/null +++ b/cmds/db-manager/cleanup/scd-evict.go @@ -0,0 +1,127 @@ +package cleanup + +import ( + "context" + "fmt" + "log" + "time" + + "github.com/interuss/dss/pkg/cockroach" + crdbflags "github.com/interuss/dss/pkg/cockroach/flags" + dssmodels "github.com/interuss/dss/pkg/models" + scdmodels "github.com/interuss/dss/pkg/scd/models" + "github.com/interuss/dss/pkg/scd/repos" + scdc "github.com/interuss/dss/pkg/scd/store/cockroach" + "github.com/spf13/cobra" + "github.com/spf13/pflag" +) + +var ( + ScdEvictCmd = &cobra.Command{ + Use: "scd-evict", + Short: "List and evict SCD expired entities", + RunE: scdEvict, + } + flags = pflag.NewFlagSet("scd-evict", pflag.ExitOnError) + listOpIntents = flags.Bool("op_intents", true, "set this flag to true to list expired operational intents") + listScdSubs = flags.Bool("scd_subs", true, "set this flag to true to list expired SCD subscriptions") + ttl = flags.Duration("ttl", time.Hour*24*112, "time-to-live duration used for determining expiration, defaults to 2*56 days which should be a safe value in most cases") + deleteExpired = flags.Bool("delete", false, "set this flag to true to delete the expired entities") +) + +func init() { + ScdEvictCmd.Flags().AddFlagSet(flags) +} + +func scdEvict(cmd *cobra.Command, _ []string) error { + var ( + ctx = cmd.Context() + threshold = time.Now().Add(-*ttl) + ) + + scdStore, err := getSCDStore(ctx) + if err != nil { + return err + } + + var ( + expiredOpIntents []*scdmodels.OperationalIntent + expiredSubs []*scdmodels.Subscription + ) + action := func(ctx context.Context, r repos.Repository) (err error) { + if *listOpIntents { + expiredOpIntents, err = r.ListExpiredOperationalIntents(ctx, threshold) + if err != nil { + return fmt.Errorf("listing expired operational intents: %w", err) + } + if *deleteExpired { + for _, opIntent := range expiredOpIntents { + if err = r.DeleteOperationalIntent(ctx, opIntent.ID); err != nil { + return fmt.Errorf("deleting expired operational intents: %w", err) + } + } + } + } + + if *listScdSubs { + expiredSubs, err = r.ListExpiredSubscriptions(ctx, threshold) + if err != nil { + return fmt.Errorf("listing expired subscriptions: %w", err) + } + if *deleteExpired { + for _, sub := range expiredSubs { + if err = r.DeleteSubscription(ctx, sub.ID); err != nil { + return fmt.Errorf("deleting expired subscriptions: %w", err) + } + } + } + } + + return nil + } + if err = scdStore.Transact(ctx, action); err != nil { + return fmt.Errorf("failed to execute CRDB transaction: %w", err) + } + + for _, opIntent := range expiredOpIntents { + logExpiredEntity("operational intent", opIntent.ID, threshold, *deleteExpired, opIntent.EndTime != nil) + } + for _, sub := range expiredSubs { + logExpiredEntity("subscription", sub.ID, threshold, *deleteExpired, sub.EndTime != nil) + } + if len(expiredOpIntents) == 0 && len(expiredSubs) == 0 { + log.Printf("no entity older than %s found", threshold.String()) + } else if !*deleteExpired { + log.Printf("no entity was deleted, run the command again with the `-delete` flag to do so") + } + return nil +} + +func getSCDStore(ctx context.Context) (*scdc.Store, error) { + connectParameters := crdbflags.ConnectParameters() + connectParameters.ApplicationName = "db-manager" + connectParameters.DBName = scdc.DatabaseName + scdCrdb, err := cockroach.Dial(ctx, connectParameters) + if err != nil { + return nil, fmt.Errorf("failed to connect to database with %+v: %w", connectParameters, err) + } + + scdStore, err := scdc.NewStore(ctx, scdCrdb) + if err != nil { + return nil, fmt.Errorf("failed to create strategic conflict detection store with %+v: %w", connectParameters, err) + } + return scdStore, nil +} + +func logExpiredEntity(entity string, entityID dssmodels.ID, threshold time.Time, deleted, hasEndTime bool) { + logMsg := "found" + if deleted { + logMsg = "deleted" + } + + expMsg := "last update before %s (missing end time)" + if hasEndTime { + expMsg = "end time before %s" + } + log.Printf("%s %s %s; expired due to %s", logMsg, entity, entityID.String(), fmt.Sprintf(expMsg, threshold.String())) +} diff --git a/cmds/db-manager/main.go b/cmds/db-manager/main.go index 37052232f..96a4aa886 100644 --- a/cmds/db-manager/main.go +++ b/cmds/db-manager/main.go @@ -5,6 +5,7 @@ import ( "log" "os" + "github.com/interuss/dss/cmds/db-manager/cleanup" "github.com/interuss/dss/cmds/db-manager/migration" "github.com/spf13/cobra" ) @@ -19,6 +20,7 @@ var ( func init() { DBManagerCmd.PersistentFlags().AddGoFlagSet(flag.CommandLine) // enable support for flags not yet migrated to using pflag (e.g. crdb flags) DBManagerCmd.AddCommand(migration.MigrationCmd) + DBManagerCmd.AddCommand(cleanup.ScdEvictCmd) } func main() { diff --git a/pkg/scd/repos/repos.go b/pkg/scd/repos/repos.go index ab1cc4a9f..fc84eb74a 100644 --- a/pkg/scd/repos/repos.go +++ b/pkg/scd/repos/repos.go @@ -2,6 +2,8 @@ package repos import ( "context" + "time" + "github.com/golang/geo/s2" dssmodels "github.com/interuss/dss/pkg/models" scdmodels "github.com/interuss/dss/pkg/scd/models" @@ -27,6 +29,10 @@ type OperationalIntent interface { // GetDependentOperationalIntents returns IDs of all operations dependent on // subscription identified by "subscriptionID". GetDependentOperationalIntents(ctx context.Context, subscriptionID dssmodels.ID) ([]dssmodels.ID, error) + + // ListExpiredOperationalIntents lists all operational intents older than the threshold. + // Their age is determined by their end time, or by their update time if they do not have an end time. + ListExpiredOperationalIntents(ctx context.Context, threshold time.Time) ([]*scdmodels.OperationalIntent, error) } // Subscription abstracts subscription-specific interactions with the backing repository. @@ -54,6 +60,10 @@ type Subscription interface { // LockSubscriptionsOnCells locks the subscriptions of interest on specific cells. LockSubscriptionsOnCells(ctx context.Context, cells s2.CellUnion) error + + // ListExpiredSubscriptions lists all subscriptions older than the threshold. + // Their age is determined by their end time, or by their update time if they do not have an end time. + ListExpiredSubscriptions(ctx context.Context, threshold time.Time) ([]*scdmodels.Subscription, error) } type UssAvailability interface { diff --git a/pkg/scd/store/cockroach/operational_intents.go b/pkg/scd/store/cockroach/operational_intents.go index 90b3082c7..dfd410d14 100644 --- a/pkg/scd/store/cockroach/operational_intents.go +++ b/pkg/scd/store/cockroach/operational_intents.go @@ -341,3 +341,29 @@ func (s *repo) GetDependentOperationalIntents(ctx context.Context, subscriptionI return dependentOps, nil } + +// ListExpiredOperationalIntents lists all operational intents older than the threshold. +// Their age is determined by their end time, or by their last update time if they do not have an end time. +func (s *repo) ListExpiredOperationalIntents(ctx context.Context, threshold time.Time) ([]*scdmodels.OperationalIntent, error) { + expiredOpIntentsQuery := fmt.Sprintf(` + SELECT + %s + FROM + scd_operations + WHERE + scd_operations.ends_at IS NOT NULL AND scd_operations.ends_at <= $1 + OR + scd_operations.ends_at IS NULL AND scd_operations.updated_at <= $1 -- use last update time as reference if there is no end time + LIMIT $2`, operationFieldsWithPrefix) + + result, err := s.fetchOperationalIntents( + ctx, s.q, expiredOpIntentsQuery, + threshold, + dssmodels.MaxResultLimit, + ) + if err != nil { + return nil, stacktrace.Propagate(err, "Error fetching Operations") + } + + return result, nil +} diff --git a/pkg/scd/store/cockroach/subscriptions.go b/pkg/scd/store/cockroach/subscriptions.go index 7cc7e9149..811e1e451 100644 --- a/pkg/scd/store/cockroach/subscriptions.go +++ b/pkg/scd/store/cockroach/subscriptions.go @@ -377,3 +377,30 @@ func (c *repo) LockSubscriptionsOnCells(ctx context.Context, cells s2.CellUnion) return nil } + +// ListExpiredSubscriptions lists all subscriptions older than the threshold. +// Their age is determined by their end time, or by their update time if they do not have an end time. +func (c *repo) ListExpiredSubscriptions(ctx context.Context, threshold time.Time) ([]*scdmodels.Subscription, error) { + expiredSubsQuery := fmt.Sprintf(` + SELECT + %s + FROM + scd_subscriptions + WHERE + scd_subscriptions.ends_at IS NOT NULL AND scd_subscriptions.ends_at <= $1 + OR + scd_subscriptions.ends_at IS NULL AND scd_subscriptions.updated_at <= $1 -- use last update time as reference if there is no end time + LIMIT $2`, subscriptionFieldsWithPrefix) + + subscriptions, err := c.fetchSubscriptions( + ctx, c.q, expiredSubsQuery, + threshold, + dssmodels.MaxResultLimit, + ) + if err != nil { + return nil, stacktrace.Propagate(err, "Unable to fetch Subscriptions") + } + + return subscriptions, nil + +}