diff --git a/pkg/db/db_client/db_client_execute.go b/pkg/db/db_client/db_client_execute.go index b73d42902a..6d7337e2af 100644 --- a/pkg/db/db_client/db_client_execute.go +++ b/pkg/db/db_client/db_client_execute.go @@ -5,15 +5,12 @@ import ( "database/sql" "fmt" "net/netip" - "regexp" "strings" "time" "github.com/google/uuid" "github.com/jackc/pgx/v5" - "github.com/jackc/pgx/v5/pgconn" "github.com/jackc/pgx/v5/pgtype" - "github.com/sethvargo/go-retry" "github.com/spf13/viper" "github.com/turbot/go-kit/helpers" "github.com/turbot/steampipe/pkg/constants" @@ -21,8 +18,6 @@ import ( "github.com/turbot/steampipe/pkg/error_helpers" "github.com/turbot/steampipe/pkg/query/queryresult" "github.com/turbot/steampipe/pkg/statushooks" - "github.com/turbot/steampipe/pkg/steampipeconfig" - "github.com/turbot/steampipe/pkg/steampipeconfig/modconfig" "github.com/turbot/steampipe/pkg/utils" "golang.org/x/text/language" "golang.org/x/text/message" @@ -250,136 +245,6 @@ func (c *DbClient) updateScanMetadataMaxId(ctx context.Context, session *db_comm return nil } -// TODO KAI COMMENT -func (c *DbClient) startQueryWithRetries(ctx context.Context, conn *pgx.Conn, query string, args ...any) (pgx.Rows, error) { - maxDuration := 10 * time.Minute - backoffInterval := 250 * time.Millisecond - backoff := retry.NewConstant(backoffInterval) - - var res pgx.Rows - err := retry.Do(ctx, retry.WithMaxDuration(maxDuration, backoff), func(ctx context.Context) error { - rows, queryError := c.startQuery(ctx, conn, query, args...) - if queryError == nil { - statushooks.SetStatus(ctx, "Loading results...") - res = rows - return nil - } - - missingSchema, _, relationNotFound := isRelationNotFoundError(queryError) - if !relationNotFound { - return queryError - } - // so this _was_ a relation not found error - // load the connection state and connection config to see if the missing schema is in there at all - // if there was a schema not found with an unqualified query, we keep trying until ALL the schemas have loaded - - connectionStateMap, connectionConfigMap, stateErr := loadConnectionConfigAndState(ctx, conn) - if stateErr != nil { - // just return the query error - return queryError - } - // if there are no connections, just return the error - if len(connectionConfigMap) == 0 { - return queryError - } - - statusMessage := getLoadingConnectionStatusMessage(connectionStateMap, connectionConfigMap, missingSchema) - - // if a schema was specified, verify it exists in the connection state or connection config - - if missingSchema != "" { - connectionState, missingSchemaExistsInStateMap := connectionStateMap[missingSchema] - if missingSchemaExistsInStateMap { - // if connection is in error or has been ready for more than the backoff interval, do not retry - // (in other words, if it has only just become ready, then retry the query) - if connectionState.State == constants.ConnectionStateError || - connectionState.State == constants.ConnectionStateReady && time.Since(connectionState.ConnectionModTime) > backoffInterval { - return queryError - } - } else { - // missing schema is not in connection state map - it may not have updated yet - - _, missingSchemaExistsInConnectionConfig := connectionConfigMap[missingSchema] - if !missingSchemaExistsInConnectionConfig { - // schema is not in connection config either - just return relation not found error - return queryError - } - } - } else { - // if no schema was specified, return if the conneciton state is not pending - // (and has same length as connection config, - // i.e. it has been updated to reflect recent config change) - if !connectionStateMap.Pending() && len(connectionStateMap) == len(connectionConfigMap) { - return queryError - } - - // otherwise we need to wait for everything to load - } - - statushooks.SetStatus(ctx, statusMessage) - - // retry - return retry.RetryableError(queryError) - }) - - return res, err -} - -func loadConnectionConfigAndState(ctx context.Context, conn *pgx.Conn) (steampipeconfig.ConnectionDataMap, map[string]*modconfig.Connection, error) { - connectionStateMap, err := steampipeconfig.LoadConnectionState(ctx, conn, steampipeconfig.WithWaitForPending) - if err != nil { - return nil, nil, err - } - - connectionConfig, errorsAndWarnings := steampipeconfig.LoadConnectionConfig() - if err := errorsAndWarnings.GetError(); err != nil { - return nil, nil, err - } - return connectionStateMap, connectionConfig.Connections, nil -} - -func getLoadingConnectionStatusMessage(connectionStateMap steampipeconfig.ConnectionDataMap, connectionConfigMap map[string]*modconfig.Connection, missingSchema string) string { - var connectionSummary = connectionStateMap.GetSummary() - - readyCount := connectionSummary[constants.ConnectionStateReady] - totalCount := len(connectionConfigMap) - - loadedMessage := fmt.Sprintf("Loaded %d of %d %s", - readyCount, - totalCount, - utils.Pluralize("connection", totalCount)) - - if missingSchema == "" { - return loadedMessage - } - - return fmt.Sprintf("Waiting for connection '%s' to load (%s)", missingSchema, loadedMessage) -} -func isRelationNotFoundError(err error) (string, string, bool) { - if err == nil { - return "", "", false - } - pgErr, ok := err.(*pgconn.PgError) - if !ok || pgErr.Code != "42P01" { - return "", "", false - } - - r := regexp.MustCompile(`^relation "(.*)\.(.*)" does not exist$`) - captureGroups := r.FindStringSubmatch(pgErr.Message) - if len(captureGroups) == 3 { - - return captureGroups[1], captureGroups[2], true - } - - // maybe there is no schema - r = regexp.MustCompile(`^relation "(.*)" does not exist$`) - captureGroups = r.FindStringSubmatch(pgErr.Message) - if len(captureGroups) == 2 { - return "", captureGroups[1], true - } - return "", "", true -} - // run query in a goroutine, so we can check for cancellation // in case the client becomes unresponsive and does not respect context cancellation func (c *DbClient) startQuery(ctx context.Context, conn *pgx.Conn, query string, args ...any) (rows pgx.Rows, err error) { diff --git a/pkg/db/db_client/db_client_execute_retry.go b/pkg/db/db_client/db_client_execute_retry.go new file mode 100644 index 0000000000..5aecae8624 --- /dev/null +++ b/pkg/db/db_client/db_client_execute_retry.go @@ -0,0 +1,127 @@ +package db_client + +import ( + "context" + "fmt" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgconn" + "github.com/sethvargo/go-retry" + "github.com/turbot/steampipe/pkg/constants" + "github.com/turbot/steampipe/pkg/statushooks" + "github.com/turbot/steampipe/pkg/steampipeconfig" + "github.com/turbot/steampipe/pkg/utils" + "regexp" + "time" +) + +// execute query - if it fails with a "relation not found" error, determine whether this is because the required schema +// has not yet loaded and if so, wait for it to load and retry +func (c *DbClient) startQueryWithRetries(ctx context.Context, conn *pgx.Conn, query string, args ...any) (pgx.Rows, error) { + maxDuration := 10 * time.Minute + backoffInterval := 250 * time.Millisecond + backoff := retry.NewConstant(backoffInterval) + + var res pgx.Rows + err := retry.Do(ctx, retry.WithMaxDuration(maxDuration, backoff), func(ctx context.Context) error { + rows, queryError := c.startQuery(ctx, conn, query, args...) + if queryError == nil { + statushooks.SetStatus(ctx, "Loading results...") + res = rows + return nil + } + + missingSchema, _, relationNotFound := isRelationNotFoundError(queryError) + if !relationNotFound { + return queryError + } + // so this _was_ a relation not found error + // load the connection state and connection config to see if the missing schema is in there at all + // if there was a schema not found with an unqualified query, we keep trying until ALL the schemas have loaded + + connectionStateMap, stateErr := steampipeconfig.LoadConnectionState(ctx, conn, steampipeconfig.WithWaitForPending) + if stateErr != nil { + // just return the query error + return queryError + } + // if there are no connections, just return the error + if len(connectionStateMap) == 0 { + return queryError + } + + statusMessage := getLoadingConnectionStatusMessage(connectionStateMap, missingSchema) + + // if a schema was specified, verify it exists in the connection state or connection config + + if missingSchema != "" { + connectionState, missingSchemaExistsInStateMap := connectionStateMap[missingSchema] + if missingSchemaExistsInStateMap { + // if connection is in error or has been ready for more than the backoff interval, do not retry + // (in other words, if it has only just become ready, then retry the query) + if connectionState.State == constants.ConnectionStateError || + connectionState.State == constants.ConnectionStateReady && time.Since(connectionState.ConnectionModTime) > backoffInterval { + return queryError + } + } else { + // missing schema is not in connection state map - just return the error + return queryError + } + } else { + // if no schema was specified, return if the connection state is not pending + if !connectionStateMap.Pending() { + return queryError + } + + // otherwise we need to wait for everything to load + } + + statushooks.SetStatus(ctx, statusMessage) + + // retry + return retry.RetryableError(queryError) + }) + + return res, err +} + +func getLoadingConnectionStatusMessage(connectionStateMap steampipeconfig.ConnectionDataMap, missingSchema string) string { + var connectionSummary = connectionStateMap.GetSummary() + + readyCount := connectionSummary[constants.ConnectionStateReady] + totalCount := len(connectionStateMap) - connectionSummary[constants.ConnectionStateDeleting] + + loadedMessage := fmt.Sprintf("Loaded %d of %d %s", + readyCount, + totalCount, + utils.Pluralize("connection", totalCount)) + + if missingSchema == "" { + return loadedMessage + } + + return fmt.Sprintf("Waiting for connection '%s' to load (%s)", missingSchema, loadedMessage) +} + +func isRelationNotFoundError(err error) (string, string, bool) { + if err == nil { + return "", "", false + } + pgErr, ok := err.(*pgconn.PgError) + if !ok || pgErr.Code != "42P01" { + return "", "", false + } + + r := regexp.MustCompile(`^relation "(.*)\.(.*)" does not exist$`) + captureGroups := r.FindStringSubmatch(pgErr.Message) + if len(captureGroups) == 3 { + + return captureGroups[1], captureGroups[2], true + } + + // maybe there is no schema + r = regexp.MustCompile(`^relation "(.*)" does not exist$`) + captureGroups = r.FindStringSubmatch(pgErr.Message) + if len(captureGroups) == 2 { + return "", captureGroups[1], true + } + return "", "", true +} diff --git a/pkg/steampipeconfig/connection_state.go b/pkg/steampipeconfig/connection_state.go index 6c985e42c5..5e4dbe6b7e 100644 --- a/pkg/steampipeconfig/connection_state.go +++ b/pkg/steampipeconfig/connection_state.go @@ -34,9 +34,13 @@ func LoadConnectionState(ctx context.Context, conn *pgx.Conn, opts ...LoadConnec opt(config) } // max duration depends on if waiting for ready or just pending - maxDuration := 20 * time.Second + // default value is if we are waiting for pending + // set this to a long enough time for ConnectionUpdates to be generated for a large connection count + // TODO this time can be reduced once all; plugins are using v5.4.1 of the sdk + maxDuration := 1 * time.Minute if config.WaitForReady { - maxDuration = 5 * time.Minute + // is we are waiting for all connections to be ready, wait up to 10 minutes + maxDuration = 10 * time.Minute } retryInterval := 50 * time.Millisecond backoff := retry.NewConstant(retryInterval)