Skip to content

Commit

Permalink
fix startQueryWithRetries - use correct total by loading connection c…
Browse files Browse the repository at this point in the history
…onfig (need to revert this as it will only work for local db?)
  • Loading branch information
kaidaguerre committed Apr 28, 2023
1 parent 30c2f3f commit 7beb4f0
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 137 deletions.
135 changes: 0 additions & 135 deletions pkg/db/db_client/db_client_execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,19 @@ import (
"database/sql"
"fmt"
"net/netip"
"regexp"
"strings"
"time"

"github.com/google/uuid"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pgtype"
"github.com/sethvargo/go-retry"
"github.com/spf13/viper"
"github.com/turbot/go-kit/helpers"
"github.com/turbot/steampipe/pkg/constants"
"github.com/turbot/steampipe/pkg/db/db_common"
"github.com/turbot/steampipe/pkg/error_helpers"
"github.com/turbot/steampipe/pkg/query/queryresult"
"github.com/turbot/steampipe/pkg/statushooks"
"github.com/turbot/steampipe/pkg/steampipeconfig"
"github.com/turbot/steampipe/pkg/steampipeconfig/modconfig"
"github.com/turbot/steampipe/pkg/utils"
"golang.org/x/text/language"
"golang.org/x/text/message"
Expand Down Expand Up @@ -250,136 +245,6 @@ func (c *DbClient) updateScanMetadataMaxId(ctx context.Context, session *db_comm
return nil
}

// TODO KAI COMMENT
func (c *DbClient) startQueryWithRetries(ctx context.Context, conn *pgx.Conn, query string, args ...any) (pgx.Rows, error) {
maxDuration := 10 * time.Minute
backoffInterval := 250 * time.Millisecond
backoff := retry.NewConstant(backoffInterval)

var res pgx.Rows
err := retry.Do(ctx, retry.WithMaxDuration(maxDuration, backoff), func(ctx context.Context) error {
rows, queryError := c.startQuery(ctx, conn, query, args...)
if queryError == nil {
statushooks.SetStatus(ctx, "Loading results...")
res = rows
return nil
}

missingSchema, _, relationNotFound := isRelationNotFoundError(queryError)
if !relationNotFound {
return queryError
}
// so this _was_ a relation not found error
// load the connection state and connection config to see if the missing schema is in there at all
// if there was a schema not found with an unqualified query, we keep trying until ALL the schemas have loaded

connectionStateMap, connectionConfigMap, stateErr := loadConnectionConfigAndState(ctx, conn)
if stateErr != nil {
// just return the query error
return queryError
}
// if there are no connections, just return the error
if len(connectionConfigMap) == 0 {
return queryError
}

statusMessage := getLoadingConnectionStatusMessage(connectionStateMap, connectionConfigMap, missingSchema)

// if a schema was specified, verify it exists in the connection state or connection config

if missingSchema != "" {
connectionState, missingSchemaExistsInStateMap := connectionStateMap[missingSchema]
if missingSchemaExistsInStateMap {
// if connection is in error or has been ready for more than the backoff interval, do not retry
// (in other words, if it has only just become ready, then retry the query)
if connectionState.State == constants.ConnectionStateError ||
connectionState.State == constants.ConnectionStateReady && time.Since(connectionState.ConnectionModTime) > backoffInterval {
return queryError
}
} else {
// missing schema is not in connection state map - it may not have updated yet

_, missingSchemaExistsInConnectionConfig := connectionConfigMap[missingSchema]
if !missingSchemaExistsInConnectionConfig {
// schema is not in connection config either - just return relation not found error
return queryError
}
}
} else {
// if no schema was specified, return if the conneciton state is not pending
// (and has same length as connection config,
// i.e. it has been updated to reflect recent config change)
if !connectionStateMap.Pending() && len(connectionStateMap) == len(connectionConfigMap) {
return queryError
}

// otherwise we need to wait for everything to load
}

statushooks.SetStatus(ctx, statusMessage)

// retry
return retry.RetryableError(queryError)
})

return res, err
}

func loadConnectionConfigAndState(ctx context.Context, conn *pgx.Conn) (steampipeconfig.ConnectionDataMap, map[string]*modconfig.Connection, error) {
connectionStateMap, err := steampipeconfig.LoadConnectionState(ctx, conn, steampipeconfig.WithWaitForPending)
if err != nil {
return nil, nil, err
}

connectionConfig, errorsAndWarnings := steampipeconfig.LoadConnectionConfig()
if err := errorsAndWarnings.GetError(); err != nil {
return nil, nil, err
}
return connectionStateMap, connectionConfig.Connections, nil
}

func getLoadingConnectionStatusMessage(connectionStateMap steampipeconfig.ConnectionDataMap, connectionConfigMap map[string]*modconfig.Connection, missingSchema string) string {
var connectionSummary = connectionStateMap.GetSummary()

readyCount := connectionSummary[constants.ConnectionStateReady]
totalCount := len(connectionConfigMap)

loadedMessage := fmt.Sprintf("Loaded %d of %d %s",
readyCount,
totalCount,
utils.Pluralize("connection", totalCount))

if missingSchema == "" {
return loadedMessage
}

return fmt.Sprintf("Waiting for connection '%s' to load (%s)", missingSchema, loadedMessage)
}
func isRelationNotFoundError(err error) (string, string, bool) {
if err == nil {
return "", "", false
}
pgErr, ok := err.(*pgconn.PgError)
if !ok || pgErr.Code != "42P01" {
return "", "", false
}

r := regexp.MustCompile(`^relation "(.*)\.(.*)" does not exist$`)
captureGroups := r.FindStringSubmatch(pgErr.Message)
if len(captureGroups) == 3 {

return captureGroups[1], captureGroups[2], true
}

// maybe there is no schema
r = regexp.MustCompile(`^relation "(.*)" does not exist$`)
captureGroups = r.FindStringSubmatch(pgErr.Message)
if len(captureGroups) == 2 {
return "", captureGroups[1], true
}
return "", "", true
}

// run query in a goroutine, so we can check for cancellation
// in case the client becomes unresponsive and does not respect context cancellation
func (c *DbClient) startQuery(ctx context.Context, conn *pgx.Conn, query string, args ...any) (rows pgx.Rows, err error) {
Expand Down
127 changes: 127 additions & 0 deletions pkg/db/db_client/db_client_execute_retry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package db_client

import (
"context"
"fmt"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
"github.com/sethvargo/go-retry"
"github.com/turbot/steampipe/pkg/constants"
"github.com/turbot/steampipe/pkg/statushooks"
"github.com/turbot/steampipe/pkg/steampipeconfig"
"github.com/turbot/steampipe/pkg/utils"
"regexp"
"time"
)

// execute query - if it fails with a "relation not found" error, determine whether this is because the required schema
// has not yet loaded and if so, wait for it to load and retry
func (c *DbClient) startQueryWithRetries(ctx context.Context, conn *pgx.Conn, query string, args ...any) (pgx.Rows, error) {
maxDuration := 10 * time.Minute
backoffInterval := 250 * time.Millisecond
backoff := retry.NewConstant(backoffInterval)

var res pgx.Rows
err := retry.Do(ctx, retry.WithMaxDuration(maxDuration, backoff), func(ctx context.Context) error {
rows, queryError := c.startQuery(ctx, conn, query, args...)
if queryError == nil {
statushooks.SetStatus(ctx, "Loading results...")
res = rows
return nil
}

missingSchema, _, relationNotFound := isRelationNotFoundError(queryError)
if !relationNotFound {
return queryError
}
// so this _was_ a relation not found error
// load the connection state and connection config to see if the missing schema is in there at all
// if there was a schema not found with an unqualified query, we keep trying until ALL the schemas have loaded

connectionStateMap, stateErr := steampipeconfig.LoadConnectionState(ctx, conn, steampipeconfig.WithWaitForPending)
if stateErr != nil {
// just return the query error
return queryError
}
// if there are no connections, just return the error
if len(connectionStateMap) == 0 {
return queryError
}

statusMessage := getLoadingConnectionStatusMessage(connectionStateMap, missingSchema)

// if a schema was specified, verify it exists in the connection state or connection config

if missingSchema != "" {
connectionState, missingSchemaExistsInStateMap := connectionStateMap[missingSchema]
if missingSchemaExistsInStateMap {
// if connection is in error or has been ready for more than the backoff interval, do not retry
// (in other words, if it has only just become ready, then retry the query)
if connectionState.State == constants.ConnectionStateError ||
connectionState.State == constants.ConnectionStateReady && time.Since(connectionState.ConnectionModTime) > backoffInterval {
return queryError
}
} else {
// missing schema is not in connection state map - just return the error
return queryError
}
} else {
// if no schema was specified, return if the connection state is not pending
if !connectionStateMap.Pending() {
return queryError
}

// otherwise we need to wait for everything to load
}

statushooks.SetStatus(ctx, statusMessage)

// retry
return retry.RetryableError(queryError)
})

return res, err
}

func getLoadingConnectionStatusMessage(connectionStateMap steampipeconfig.ConnectionDataMap, missingSchema string) string {
var connectionSummary = connectionStateMap.GetSummary()

readyCount := connectionSummary[constants.ConnectionStateReady]
totalCount := len(connectionStateMap) - connectionSummary[constants.ConnectionStateDeleting]

loadedMessage := fmt.Sprintf("Loaded %d of %d %s",
readyCount,
totalCount,
utils.Pluralize("connection", totalCount))

if missingSchema == "" {
return loadedMessage
}

return fmt.Sprintf("Waiting for connection '%s' to load (%s)", missingSchema, loadedMessage)
}

func isRelationNotFoundError(err error) (string, string, bool) {
if err == nil {
return "", "", false
}
pgErr, ok := err.(*pgconn.PgError)
if !ok || pgErr.Code != "42P01" {
return "", "", false
}

r := regexp.MustCompile(`^relation "(.*)\.(.*)" does not exist$`)
captureGroups := r.FindStringSubmatch(pgErr.Message)
if len(captureGroups) == 3 {

return captureGroups[1], captureGroups[2], true
}

// maybe there is no schema
r = regexp.MustCompile(`^relation "(.*)" does not exist$`)
captureGroups = r.FindStringSubmatch(pgErr.Message)
if len(captureGroups) == 2 {
return "", captureGroups[1], true
}
return "", "", true
}
8 changes: 6 additions & 2 deletions pkg/steampipeconfig/connection_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,13 @@ func LoadConnectionState(ctx context.Context, conn *pgx.Conn, opts ...LoadConnec
opt(config)
}
// max duration depends on if waiting for ready or just pending
maxDuration := 20 * time.Second
// default value is if we are waiting for pending
// set this to a long enough time for ConnectionUpdates to be generated for a large connection count
// TODO this time can be reduced once all; plugins are using v5.4.1 of the sdk
maxDuration := 1 * time.Minute
if config.WaitForReady {
maxDuration = 5 * time.Minute
// is we are waiting for all connections to be ready, wait up to 10 minutes
maxDuration = 10 * time.Minute
}
retryInterval := 50 * time.Millisecond
backoff := retry.NewConstant(retryInterval)
Expand Down

0 comments on commit 7beb4f0

Please sign in to comment.