diff --git a/backend/controller/controller.go b/backend/controller/controller.go index 28399102a..e347fa38d 100644 --- a/backend/controller/controller.go +++ b/backend/controller/controller.go @@ -4,6 +4,7 @@ import ( "bytes" "context" sha "crypto/sha256" + "database/sql" "encoding/binary" "encoding/json" "errors" @@ -25,7 +26,6 @@ import ( "github.com/alecthomas/types/either" "github.com/alecthomas/types/optional" "github.com/jackc/pgx/v5" - "github.com/jackc/pgx/v5/pgxpool" "github.com/jellydator/ttlcache/v3" "github.com/jpillora/backoff" "golang.org/x/exp/maps" @@ -147,7 +147,7 @@ func (c *Config) SetDefaults() { } // Start the Controller. Blocks until the context is cancelled. -func Start(ctx context.Context, config Config, runnerScaling scaling.RunnerScaling, pool *pgxpool.Pool, encryptors *dal.Encryptors) error { +func Start(ctx context.Context, config Config, runnerScaling scaling.RunnerScaling, conn *sql.DB, encryptors *dal.Encryptors) error { config.SetDefaults() logger := log.FromContext(ctx) @@ -168,7 +168,7 @@ func Start(ctx context.Context, config Config, runnerScaling scaling.RunnerScali logger.Infof("Web console available at: %s", config.Bind) } - svc, err := New(ctx, pool, config, runnerScaling, encryptors) + svc, err := New(ctx, conn, config, runnerScaling, encryptors) if err != nil { return err } @@ -226,7 +226,7 @@ type ControllerListListener interface { } type Service struct { - pool *pgxpool.Pool + conn *sql.DB dal *dal.DAL key model.ControllerKey deploymentLogsSink *deploymentLogsSink @@ -250,7 +250,7 @@ type Service struct { asyncCallsLock sync.Mutex } -func New(ctx context.Context, pool *pgxpool.Pool, config Config, runnerScaling scaling.RunnerScaling, encryptors *dal.Encryptors) (*Service, error) { +func New(ctx context.Context, conn *sql.DB, config Config, runnerScaling scaling.RunnerScaling, encryptors *dal.Encryptors) (*Service, error) { key := config.Key if config.Key.IsZero() { key = model.NewControllerKey(config.Bind.Hostname(), config.Bind.Port()) @@ -264,7 +264,7 @@ func New(ctx context.Context, pool *pgxpool.Pool, config Config, runnerScaling s config.ControllerTimeout = time.Second * 5 } - db, err := dal.New(ctx, pool, encryptors) + db, err := dal.New(ctx, conn, encryptors) if err != nil { return nil, fmt.Errorf("failed to create DAL: %w", err) } @@ -272,7 +272,7 @@ func New(ctx context.Context, pool *pgxpool.Pool, config Config, runnerScaling s svc := &Service{ tasks: scheduledtask.New(ctx, key, db), dal: db, - pool: pool, + conn: conn, key: key, deploymentLogsSink: newDeploymentLogsSink(ctx, db), clients: ttlcache.New(ttlcache.WithTTL[string, clients](time.Minute)), @@ -283,7 +283,7 @@ func New(ctx context.Context, pool *pgxpool.Pool, config Config, runnerScaling s svc.routes.Store(map[string][]dal.Route{}) svc.schema.Store(&schema.Schema{}) - cronSvc := cronjobs.New(ctx, key, svc.config.Advertise.Host, cronjobs.Config{Timeout: config.CronJobTimeout}, pool, svc.tasks, svc.callWithRequest) + cronSvc := cronjobs.New(ctx, key, svc.config.Advertise.Host, cronjobs.Config{Timeout: config.CronJobTimeout}, conn, svc.tasks, svc.callWithRequest) svc.cronJobs = cronSvc svc.controllerListListeners = append(svc.controllerListListeners, cronSvc) diff --git a/backend/controller/cronjobs/cronjobs.go b/backend/controller/cronjobs/cronjobs.go index 1f555a1f7..c892c7000 100644 --- a/backend/controller/cronjobs/cronjobs.go +++ b/backend/controller/cronjobs/cronjobs.go @@ -2,6 +2,7 @@ package cronjobs import ( "context" + "database/sql" "encoding/json" "errors" "fmt" @@ -12,7 +13,6 @@ import ( "github.com/alecthomas/types/optional" "github.com/alecthomas/types/pubsub" "github.com/benbjohnson/clock" - "github.com/jackc/pgx/v5/pgxpool" "github.com/jpillora/backoff" "github.com/serialx/hashring" @@ -97,8 +97,8 @@ type Service struct { hashRingState atomic.Value[*hashRingState] } -func New(ctx context.Context, key model.ControllerKey, requestSource string, config Config, pool *pgxpool.Pool, scheduler Scheduler, call ExecuteCallFunc) *Service { - return NewForTesting(ctx, key, requestSource, config, dal.New(pool), scheduler, call, clock.New()) +func New(ctx context.Context, key model.ControllerKey, requestSource string, config Config, conn *sql.DB, scheduler Scheduler, call ExecuteCallFunc) *Service { + return NewForTesting(ctx, key, requestSource, config, dal.New(conn), scheduler, call, clock.New()) } func NewForTesting(ctx context.Context, key model.ControllerKey, requestSource string, config Config, dal DAL, scheduler Scheduler, call ExecuteCallFunc, clock clock.Clock) *Service { diff --git a/backend/controller/cronjobs/dal/dal.go b/backend/controller/cronjobs/dal/dal.go index 940846520..9499717fe 100644 --- a/backend/controller/cronjobs/dal/dal.go +++ b/backend/controller/cronjobs/dal/dal.go @@ -6,9 +6,8 @@ import ( "fmt" "time" - "github.com/jackc/pgx/v5/pgxpool" - "github.com/TBD54566975/ftl/backend/controller/cronjobs/sql" + "github.com/TBD54566975/ftl/backend/controller/sql/sqltypes" dalerrs "github.com/TBD54566975/ftl/backend/dal" "github.com/TBD54566975/ftl/backend/schema" "github.com/TBD54566975/ftl/internal/model" @@ -19,8 +18,8 @@ type DAL struct { db sql.DBI } -func New(pool *pgxpool.Pool) *DAL { - return &DAL{db: sql.NewDB(pool)} +func New(conn sql.ConnI) *DAL { + return &DAL{db: sql.NewDB(conn)} } func cronJobFromRow(row sql.GetCronJobsRow) model.CronJob { @@ -92,7 +91,7 @@ func (d *DAL) EndCronJob(ctx context.Context, job model.CronJob, next time.Time) // GetStaleCronJobs returns a list of cron jobs that have been executing longer than the duration func (d *DAL) GetStaleCronJobs(ctx context.Context, duration time.Duration) ([]model.CronJob, error) { - rows, err := d.db.GetStaleCronJobs(ctx, duration) + rows, err := d.db.GetStaleCronJobs(ctx, sqltypes.Duration(duration)) if err != nil { return nil, fmt.Errorf("failed to get stale cron jobs: %w", dalerrs.TranslatePGError(err)) } diff --git a/backend/controller/cronjobs/sql/db.go b/backend/controller/cronjobs/sql/db.go index c4b45fb31..8a1e45d05 100644 --- a/backend/controller/cronjobs/sql/db.go +++ b/backend/controller/cronjobs/sql/db.go @@ -6,15 +6,14 @@ package sql import ( "context" - - "github.com/jackc/pgx/v5" - "github.com/jackc/pgx/v5/pgconn" + "database/sql" ) type DBTX interface { - Exec(context.Context, string, ...interface{}) (pgconn.CommandTag, error) - Query(context.Context, string, ...interface{}) (pgx.Rows, error) - QueryRow(context.Context, string, ...interface{}) pgx.Row + ExecContext(context.Context, string, ...interface{}) (sql.Result, error) + PrepareContext(context.Context, string) (*sql.Stmt, error) + QueryContext(context.Context, string, ...interface{}) (*sql.Rows, error) + QueryRowContext(context.Context, string, ...interface{}) *sql.Row } func New(db DBTX) *Queries { @@ -25,7 +24,7 @@ type Queries struct { db DBTX } -func (q *Queries) WithTx(tx pgx.Tx) *Queries { +func (q *Queries) WithTx(tx *sql.Tx) *Queries { return &Queries{ db: tx, } diff --git a/backend/controller/cronjobs/sql/models.go b/backend/controller/cronjobs/sql/models.go index e67f71b01..c5bddc7eb 100644 --- a/backend/controller/cronjobs/sql/models.go +++ b/backend/controller/cronjobs/sql/models.go @@ -11,10 +11,12 @@ import ( "time" "github.com/TBD54566975/ftl/backend/controller/leases" + "github.com/TBD54566975/ftl/backend/controller/sql/sqltypes" "github.com/TBD54566975/ftl/backend/schema" "github.com/TBD54566975/ftl/internal/model" "github.com/alecthomas/types/optional" "github.com/google/uuid" + "github.com/sqlc-dev/pqtype" ) type AsyncCallState string @@ -376,16 +378,16 @@ type AsyncCall struct { State AsyncCallState Origin string ScheduledAt time.Time - Request []byte - Response []byte + Request json.RawMessage + Response pqtype.NullRawMessage Error optional.Option[string] RemainingAttempts int32 - Backoff time.Duration - MaxBackoff time.Duration + Backoff sqltypes.Duration + MaxBackoff sqltypes.Duration CatchVerb optional.Option[schema.RefKey] Catching bool ParentRequestKey optional.Option[string] - TraceContext []byte + TraceContext pqtype.NullRawMessage } type Controller struct { @@ -415,7 +417,7 @@ type Deployment struct { ModuleID int64 Key model.DeploymentKey Schema *schema.Module - Labels []byte + Labels json.RawMessage MinReplicas int32 } @@ -467,7 +469,7 @@ type Lease struct { Key leases.Key CreatedAt time.Time ExpiresAt time.Time - Metadata []byte + Metadata pqtype.NullRawMessage } type Module struct { @@ -481,7 +483,7 @@ type ModuleConfiguration struct { CreatedAt time.Time Module optional.Option[string] Name string - Value []byte + Value json.RawMessage } type ModuleSecret struct { @@ -509,7 +511,7 @@ type Runner struct { Endpoint string ModuleName optional.Option[string] DeploymentID optional.Option[int64] - Labels []byte + Labels json.RawMessage } type Topic struct { @@ -530,7 +532,7 @@ type TopicEvent struct { Payload []byte Caller optional.Option[string] RequestKey optional.Option[string] - TraceContext []byte + TraceContext pqtype.NullRawMessage } type TopicSubscriber struct { @@ -541,8 +543,8 @@ type TopicSubscriber struct { DeploymentID int64 Sink schema.RefKey RetryAttempts int32 - Backoff time.Duration - MaxBackoff time.Duration + Backoff sqltypes.Duration + MaxBackoff sqltypes.Duration CatchVerb optional.Option[schema.RefKey] } diff --git a/backend/controller/cronjobs/sql/querier.go b/backend/controller/cronjobs/sql/querier.go index 9f8cb6a55..319aa22a6 100644 --- a/backend/controller/cronjobs/sql/querier.go +++ b/backend/controller/cronjobs/sql/querier.go @@ -8,6 +8,7 @@ import ( "context" "time" + "github.com/TBD54566975/ftl/backend/controller/sql/sqltypes" "github.com/TBD54566975/ftl/internal/model" ) @@ -15,7 +16,7 @@ type Querier interface { CreateCronJob(ctx context.Context, arg CreateCronJobParams) error EndCronJob(ctx context.Context, nextExecution time.Time, key model.CronJobKey, startTime time.Time) (EndCronJobRow, error) GetCronJobs(ctx context.Context) ([]GetCronJobsRow, error) - GetStaleCronJobs(ctx context.Context, dollar_1 time.Duration) ([]GetStaleCronJobsRow, error) + GetStaleCronJobs(ctx context.Context, dollar_1 sqltypes.Duration) ([]GetStaleCronJobsRow, error) StartCronJobs(ctx context.Context, keys []string) ([]StartCronJobsRow, error) } diff --git a/backend/controller/cronjobs/sql/queries.sql.go b/backend/controller/cronjobs/sql/queries.sql.go index 5199dc158..fe741663d 100644 --- a/backend/controller/cronjobs/sql/queries.sql.go +++ b/backend/controller/cronjobs/sql/queries.sql.go @@ -9,7 +9,9 @@ import ( "context" "time" + "github.com/TBD54566975/ftl/backend/controller/sql/sqltypes" "github.com/TBD54566975/ftl/internal/model" + "github.com/lib/pq" ) const createCronJob = `-- name: CreateCronJob :exec @@ -35,7 +37,7 @@ type CreateCronJobParams struct { } func (q *Queries) CreateCronJob(ctx context.Context, arg CreateCronJobParams) error { - _, err := q.db.Exec(ctx, createCronJob, + _, err := q.db.ExecContext(ctx, createCronJob, arg.Key, arg.DeploymentKey, arg.ModuleName, @@ -75,7 +77,7 @@ type EndCronJobRow struct { } func (q *Queries) EndCronJob(ctx context.Context, nextExecution time.Time, key model.CronJobKey, startTime time.Time) (EndCronJobRow, error) { - row := q.db.QueryRow(ctx, endCronJob, nextExecution, key, startTime) + row := q.db.QueryRowContext(ctx, endCronJob, nextExecution, key, startTime) var i EndCronJobRow err := row.Scan( &i.Key, @@ -109,7 +111,7 @@ type GetCronJobsRow struct { } func (q *Queries) GetCronJobs(ctx context.Context) ([]GetCronJobsRow, error) { - rows, err := q.db.Query(ctx, getCronJobs) + rows, err := q.db.QueryContext(ctx, getCronJobs) if err != nil { return nil, err } @@ -131,6 +133,9 @@ func (q *Queries) GetCronJobs(ctx context.Context) ([]GetCronJobsRow, error) { } items = append(items, i) } + if err := rows.Close(); err != nil { + return nil, err + } if err := rows.Err(); err != nil { return nil, err } @@ -156,8 +161,8 @@ type GetStaleCronJobsRow struct { State model.CronJobState } -func (q *Queries) GetStaleCronJobs(ctx context.Context, dollar_1 time.Duration) ([]GetStaleCronJobsRow, error) { - rows, err := q.db.Query(ctx, getStaleCronJobs, dollar_1) +func (q *Queries) GetStaleCronJobs(ctx context.Context, dollar_1 sqltypes.Duration) ([]GetStaleCronJobsRow, error) { + rows, err := q.db.QueryContext(ctx, getStaleCronJobs, dollar_1) if err != nil { return nil, err } @@ -179,6 +184,9 @@ func (q *Queries) GetStaleCronJobs(ctx context.Context, dollar_1 time.Duration) } items = append(items, i) } + if err := rows.Close(); err != nil { + return nil, err + } if err := rows.Err(); err != nil { return nil, err } @@ -221,7 +229,7 @@ type StartCronJobsRow struct { } func (q *Queries) StartCronJobs(ctx context.Context, keys []string) ([]StartCronJobsRow, error) { - rows, err := q.db.Query(ctx, startCronJobs, keys) + rows, err := q.db.QueryContext(ctx, startCronJobs, pq.Array(keys)) if err != nil { return nil, err } @@ -245,6 +253,9 @@ func (q *Queries) StartCronJobs(ctx context.Context, keys []string) ([]StartCron } items = append(items, i) } + if err := rows.Close(); err != nil { + return nil, err + } if err := rows.Err(); err != nil { return nil, err } diff --git a/backend/controller/dal/async_calls.go b/backend/controller/dal/async_calls.go index 02c576f93..696bb9fdf 100644 --- a/backend/controller/dal/async_calls.go +++ b/backend/controller/dal/async_calls.go @@ -12,6 +12,7 @@ import ( "github.com/alecthomas/types/optional" "github.com/TBD54566975/ftl/backend/controller/sql" + "github.com/TBD54566975/ftl/backend/controller/sql/sqltypes" dalerrs "github.com/TBD54566975/ftl/backend/dal" "github.com/TBD54566975/ftl/backend/schema" ) @@ -101,7 +102,7 @@ func (d *DAL) AcquireAsyncCall(ctx context.Context) (call *AsyncCall, err error) defer tx.CommitOrRollback(ctx, &err) ttl := time.Second * 5 - row, err := tx.db.AcquireAsyncCall(ctx, ttl) + row, err := tx.db.AcquireAsyncCall(ctx, sqltypes.Duration(ttl)) if err != nil { err = dalerrs.TranslatePGError(err) if errors.Is(err, dalerrs.ErrNotFound) { @@ -131,11 +132,11 @@ func (d *DAL) AcquireAsyncCall(ctx context.Context) (call *AsyncCall, err error) ScheduledAt: row.ScheduledAt, QueueDepth: row.QueueDepth, ParentRequestKey: row.ParentRequestKey, - TraceContext: row.TraceContext, + TraceContext: row.TraceContext.RawMessage, RemainingAttempts: row.RemainingAttempts, Error: row.Error, - Backoff: row.Backoff, - MaxBackoff: row.MaxBackoff, + Backoff: time.Duration(row.Backoff), + MaxBackoff: time.Duration(row.MaxBackoff), Catching: row.Catching, }, nil } @@ -169,8 +170,8 @@ func (d *DAL) CompleteAsyncCall(ctx context.Context, ID: call.ID, Error: result.Get(), RemainingAttempts: call.RemainingAttempts - 1, - Backoff: min(call.Backoff*2, call.MaxBackoff), - MaxBackoff: call.MaxBackoff, + Backoff: sqltypes.Duration(min(call.Backoff*2, call.MaxBackoff)), + MaxBackoff: sqltypes.Duration(call.MaxBackoff), ScheduledAt: time.Now().Add(call.Backoff), }) if err != nil { @@ -190,8 +191,8 @@ func (d *DAL) CompleteAsyncCall(ctx context.Context, ID: call.ID, Error: result.Get(), RemainingAttempts: 0, - Backoff: call.Backoff, // maintain backoff - MaxBackoff: call.MaxBackoff, + Backoff: sqltypes.Duration(call.Backoff), // maintain backoff + MaxBackoff: sqltypes.Duration(call.MaxBackoff), ScheduledAt: scheduledAt, Catching: true, OriginalError: optional.Some(originalError), diff --git a/backend/controller/dal/dal.go b/backend/controller/dal/dal.go index 1ea5f6daf..6a8d62575 100644 --- a/backend/controller/dal/dal.go +++ b/backend/controller/dal/dal.go @@ -3,6 +3,7 @@ package dal import ( "context" + stdsql "database/sql" "encoding/json" "errors" "fmt" @@ -13,10 +14,10 @@ import ( "github.com/alecthomas/types/optional" "github.com/alecthomas/types/pubsub" sets "github.com/deckarep/golang-set/v2" - "github.com/jackc/pgx/v5/pgxpool" "google.golang.org/protobuf/proto" "github.com/TBD54566975/ftl/backend/controller/sql" + "github.com/TBD54566975/ftl/backend/controller/sql/sqltypes" dalerrs "github.com/TBD54566975/ftl/backend/dal" ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1" "github.com/TBD54566975/ftl/backend/schema" @@ -209,18 +210,12 @@ func WithReservation(ctx context.Context, reservation Reservation, fn func() err return reservation.Commit(ctx) } -func New(ctx context.Context, pool *pgxpool.Pool, encryptors *Encryptors) (*DAL, error) { - _, err := pool.Acquire(ctx) - if err != nil { - return nil, fmt.Errorf("could not acquire connection: %w", err) - } - dal := &DAL{ - db: sql.NewDB(pool), +func New(ctx context.Context, conn *stdsql.DB, encryptors *Encryptors) (*DAL, error) { + return &DAL{ + db: sql.NewDB(conn), DeploymentChanges: pubsub.New[DeploymentNotification](), encryptors: encryptors, - } - - return dal, nil + }, nil } type DAL struct { @@ -602,13 +597,13 @@ func (d *DAL) UpsertRunner(ctx context.Context, runner Runner) error { // KillStaleRunners deletes runners that have not had heartbeats for the given duration. func (d *DAL) KillStaleRunners(ctx context.Context, age time.Duration) (int64, error) { - count, err := d.db.KillStaleRunners(ctx, age) + count, err := d.db.KillStaleRunners(ctx, sqltypes.Duration(age)) return count, err } // KillStaleControllers deletes controllers that have not had heartbeats for the given duration. func (d *DAL) KillStaleControllers(ctx context.Context, age time.Duration) (int64, error) { - count, err := d.db.KillStaleControllers(ctx, age) + count, err := d.db.KillStaleControllers(ctx, sqltypes.Duration(age)) return count, err } @@ -946,7 +941,7 @@ func (d *DAL) GetProcessList(ctx context.Context) ([]Process, error) { var runner optional.Option[ProcessRunner] if endpoint, ok := row.Endpoint.Get(); ok { var labels model.Labels - if err := json.Unmarshal(row.RunnerLabels, &labels); err != nil { + if err := json.Unmarshal(row.RunnerLabels.RawMessage, &labels); err != nil { return Process{}, fmt.Errorf("invalid labels JSON for runner %s: %w", row.RunnerKey, err) } @@ -1170,7 +1165,7 @@ func (d *DAL) InsertCallEvent(ctx context.Context, call *CallEvent) error { } func (d *DAL) DeleteOldEvents(ctx context.Context, eventType EventType, age time.Duration) (int64, error) { - count, err := d.db.DeleteOldEvents(ctx, age, eventType) + count, err := d.db.DeleteOldEvents(ctx, sqltypes.Duration(age), eventType) return count, dalerrs.TranslatePGError(err) } diff --git a/backend/controller/dal/events.go b/backend/controller/dal/events.go index 7250e047a..8889d1c43 100644 --- a/backend/controller/dal/events.go +++ b/backend/controller/dal/events.go @@ -2,13 +2,13 @@ package dal import ( "context" + stdsql "database/sql" "encoding/json" "fmt" "strconv" "time" "github.com/alecthomas/types/optional" - "github.com/jackc/pgx/v5" "github.com/TBD54566975/ftl/backend/controller/sql" dalerrs "github.com/TBD54566975/ftl/backend/dal" @@ -260,10 +260,11 @@ func (d *DAL) QueryEvents(ctx context.Context, limit int, filters ...EventFilter deploymentQuery += ` WHERE key = ANY($1::TEXT[])` deploymentArgs = append(deploymentArgs, filter.deployments) } - rows, err := d.db.Conn().Query(ctx, deploymentQuery, deploymentArgs...) + rows, err := d.db.Conn().QueryContext(ctx, deploymentQuery, deploymentArgs...) if err != nil { return nil, dalerrs.TranslatePGError(err) } + defer rows.Close() // nolint:errcheck deploymentIDs := []int64{} for rows.Next() { var id int64 @@ -315,7 +316,7 @@ func (d *DAL) QueryEvents(ctx context.Context, limit int, filters ...EventFilter q += fmt.Sprintf(" LIMIT %d", limit) // Issue query. - rows, err = d.db.Conn().Query(ctx, q, args...) + rows, err = d.db.Conn().QueryContext(ctx, q, args...) if err != nil { return nil, fmt.Errorf("%s: %w", q, dalerrs.TranslatePGError(err)) } @@ -328,7 +329,7 @@ func (d *DAL) QueryEvents(ctx context.Context, limit int, filters ...EventFilter return events, nil } -func (d *DAL) transformRowsToEvents(deploymentKeys map[int64]model.DeploymentKey, rows pgx.Rows) ([]Event, error) { +func (d *DAL) transformRowsToEvents(deploymentKeys map[int64]model.DeploymentKey, rows *stdsql.Rows) ([]Event, error) { var out []Event for rows.Next() { row := eventRow{} diff --git a/backend/controller/dal/fsm.go b/backend/controller/dal/fsm.go index 750a94825..b463b39a5 100644 --- a/backend/controller/dal/fsm.go +++ b/backend/controller/dal/fsm.go @@ -12,6 +12,7 @@ import ( "github.com/TBD54566975/ftl/backend/controller/leases" "github.com/TBD54566975/ftl/backend/controller/observability" "github.com/TBD54566975/ftl/backend/controller/sql" + "github.com/TBD54566975/ftl/backend/controller/sql/sqltypes" dalerrs "github.com/TBD54566975/ftl/backend/dal" "github.com/TBD54566975/ftl/backend/schema" ) @@ -42,8 +43,8 @@ func (d *DAL) StartFSMTransition(ctx context.Context, fsm schema.RefKey, executi Origin: origin.String(), Request: encryptedRequest, RemainingAttempts: int32(retryParams.Count), - Backoff: retryParams.MinBackoff, - MaxBackoff: retryParams.MaxBackoff, + Backoff: sqltypes.Duration(retryParams.MinBackoff), + MaxBackoff: sqltypes.Duration(retryParams.MaxBackoff), CatchVerb: retryParams.Catch, }) observability.AsyncCalls.Created(ctx, destinationState, retryParams.Catch, origin.String(), int64(retryParams.Count), err) diff --git a/backend/controller/dal/lease.go b/backend/controller/dal/lease.go index d29625764..7bcfc64fc 100644 --- a/backend/controller/dal/lease.go +++ b/backend/controller/dal/lease.go @@ -9,9 +9,11 @@ import ( "github.com/alecthomas/types/optional" "github.com/google/uuid" + "github.com/sqlc-dev/pqtype" "github.com/TBD54566975/ftl/backend/controller/leases" "github.com/TBD54566975/ftl/backend/controller/sql" + "github.com/TBD54566975/ftl/backend/controller/sql/sqltypes" dalerrs "github.com/TBD54566975/ftl/backend/dal" "github.com/TBD54566975/ftl/internal/log" ) @@ -44,7 +46,7 @@ func (l *Lease) renew(ctx context.Context, cancelCtx context.CancelFunc) { case <-time.After(leaseRenewalInterval): logger.Tracef("Renewing lease") ctx, cancel := context.WithTimeout(ctx, leaseRenewalInterval) - _, err := l.db.RenewLease(ctx, l.ttl, l.idempotencyKey, l.key) + _, err := l.db.RenewLease(ctx, sqltypes.Duration(l.ttl), l.idempotencyKey, l.key) cancel() if err != nil { @@ -94,7 +96,7 @@ func (d *DAL) AcquireLease(ctx context.Context, key leases.Key, ttl time.Duratio return nil, nil, fmt.Errorf("failed to marshal lease metadata: %w", err) } } - idempotencyKey, err := d.db.NewLease(ctx, key, ttl, metadataBytes) + idempotencyKey, err := d.db.NewLease(ctx, key, sqltypes.Duration(ttl), pqtype.NullRawMessage{RawMessage: metadataBytes}) if err != nil { err = dalerrs.TranslatePGError(err) if errors.Is(err, dalerrs.ErrConflict) { @@ -128,7 +130,7 @@ func (d *DAL) GetLeaseInfo(ctx context.Context, key leases.Key, metadata any) (e if err != nil { return expiry, dalerrs.TranslatePGError(err) } - if err := json.Unmarshal(l.Metadata, metadata); err != nil { + if err := json.Unmarshal(l.Metadata.RawMessage, metadata); err != nil { return expiry, fmt.Errorf("could not unmarshal lease metadata: %w", err) } return l.ExpiresAt, nil diff --git a/backend/controller/dal/lease_test.go b/backend/controller/dal/lease_test.go index 5b73d84bb..0c6531cec 100644 --- a/backend/controller/dal/lease_test.go +++ b/backend/controller/dal/lease_test.go @@ -21,7 +21,7 @@ func leaseExists(t *testing.T, conn sql.ConnI, idempotencyKey uuid.UUID, key lea t.Helper() var count int err := dalerrs.TranslatePGError(conn. - QueryRow(context.Background(), "SELECT COUNT(*) FROM leases WHERE idempotency_key = $1 AND key = $2", idempotencyKey, key). + QueryRowContext(context.Background(), "SELECT COUNT(*) FROM leases WHERE idempotency_key = $1 AND key = $2", idempotencyKey, key). Scan(&count)) if errors.Is(err, dalerrs.ErrNotFound) { return false diff --git a/backend/controller/dal/pubsub.go b/backend/controller/dal/pubsub.go index 699799dae..667248262 100644 --- a/backend/controller/dal/pubsub.go +++ b/backend/controller/dal/pubsub.go @@ -11,6 +11,7 @@ import ( "github.com/TBD54566975/ftl/backend/controller/observability" "github.com/TBD54566975/ftl/backend/controller/sql" + "github.com/TBD54566975/ftl/backend/controller/sql/sqltypes" dalerrs "github.com/TBD54566975/ftl/backend/dal" "github.com/TBD54566975/ftl/backend/schema" "github.com/TBD54566975/ftl/internal/log" @@ -91,7 +92,7 @@ func (d *DAL) ProgressSubscriptions(ctx context.Context, eventConsumptionDelay t successful := 0 for _, subscription := range subs { - nextCursor, err := tx.db.GetNextEventForSubscription(ctx, eventConsumptionDelay, subscription.Topic, subscription.Cursor) + nextCursor, err := tx.db.GetNextEventForSubscription(ctx, sqltypes.Duration(eventConsumptionDelay), subscription.Topic, subscription.Cursor) if err != nil { observability.PubSub.PropagationFailed(ctx, "GetNextEventForSubscription", subscription.Topic.Payload, nextCursor.Caller, subscriptionRef(subscription), optional.None[schema.RefKey]()) return 0, fmt.Errorf("failed to get next cursor: %w", dalerrs.TranslatePGError(err)) @@ -133,7 +134,7 @@ func (d *DAL) ProgressSubscriptions(ctx context.Context, eventConsumptionDelay t Backoff: subscriber.Backoff, MaxBackoff: subscriber.MaxBackoff, ParentRequestKey: nextCursor.RequestKey, - TraceContext: nextCursor.TraceContext, + TraceContext: nextCursor.TraceContext.RawMessage, CatchVerb: subscriber.CatchVerb, }) observability.AsyncCalls.Created(ctx, subscriber.Sink, subscriber.CatchVerb, origin.String(), int64(subscriber.RetryAttempts), err) @@ -300,8 +301,8 @@ func (d *DAL) createSubscribers(ctx context.Context, tx *sql.Tx, key model.Deplo Deployment: key, Sink: sinkRef, RetryAttempts: int32(retryParams.Count), - Backoff: retryParams.MinBackoff, - MaxBackoff: retryParams.MaxBackoff, + Backoff: sqltypes.Duration(retryParams.MinBackoff), + MaxBackoff: sqltypes.Duration(retryParams.MaxBackoff), CatchVerb: retryParams.Catch, }) if err != nil { diff --git a/backend/controller/sql/conn.go b/backend/controller/sql/conn.go index 5a2498536..d3a29c87c 100644 --- a/backend/controller/sql/conn.go +++ b/backend/controller/sql/conn.go @@ -2,10 +2,9 @@ package sql import ( "context" + "database/sql" "errors" "fmt" - - "github.com/jackc/pgx/v5" ) type DBI interface { @@ -16,7 +15,7 @@ type DBI interface { type ConnI interface { DBTX - Begin(ctx context.Context) (pgx.Tx, error) + Begin() (*sql.Tx, error) } type DB struct { @@ -31,32 +30,36 @@ func NewDB(conn ConnI) *DB { func (d *DB) Conn() ConnI { return d.conn } func (d *DB) Begin(ctx context.Context) (*Tx, error) { - tx, err := d.conn.Begin(ctx) + tx, err := d.conn.Begin() if err != nil { return nil, err } return &Tx{tx: tx, Queries: New(tx)}, nil } +type noopSubConn struct { + DBTX +} + +func (noopSubConn) Begin() (*sql.Tx, error) { + return nil, errors.New("sql: not implemented") +} + type Tx struct { - tx pgx.Tx + tx *sql.Tx *Queries } -func (t *Tx) Conn() ConnI { return t.tx } +func (t *Tx) Conn() ConnI { return noopSubConn{t.tx} } -func (t *Tx) Tx() pgx.Tx { return t.tx } +func (t *Tx) Tx() *sql.Tx { return t.tx } func (t *Tx) Begin(ctx context.Context) (*Tx, error) { - _, err := t.tx.Begin(ctx) - if err != nil { - return nil, fmt.Errorf("beginning transaction: %w", err) - } - return &Tx{tx: t.tx, Queries: t.Queries}, nil + return nil, fmt.Errorf("cannot nest transactions") } func (t *Tx) Commit(ctx context.Context) error { - err := t.tx.Commit(ctx) + err := t.tx.Commit() if err != nil { return fmt.Errorf("committing transaction: %w", err) } @@ -65,7 +68,7 @@ func (t *Tx) Commit(ctx context.Context) error { } func (t *Tx) Rollback(ctx context.Context) error { - err := t.tx.Rollback(ctx) + err := t.tx.Rollback() if err != nil { return fmt.Errorf("rolling back transaction: %w", err) } diff --git a/backend/controller/sql/databasetesting/devel.go b/backend/controller/sql/databasetesting/devel.go index 96f6cb44c..b1c4e3dd1 100644 --- a/backend/controller/sql/databasetesting/devel.go +++ b/backend/controller/sql/databasetesting/devel.go @@ -2,11 +2,13 @@ package databasetesting import ( "context" + stdsql "database/sql" "fmt" + "net/url" + "strings" "time" - "github.com/jackc/pgx/v5" - "github.com/jackc/pgx/v5/pgxpool" + _ "github.com/jackc/pgx/v5/stdlib" // pgx driver "github.com/TBD54566975/ftl/backend/controller/sql" "github.com/TBD54566975/ftl/internal/log" @@ -15,20 +17,21 @@ import ( // CreateForDevel creates and migrates a new database for development or testing. // // If "recreate" is true, the database will be dropped and recreated. -func CreateForDevel(ctx context.Context, dsn string, recreate bool) (*pgxpool.Pool, error) { +func CreateForDevel(ctx context.Context, dsn string, recreate bool) (*stdsql.DB, error) { logger := log.FromContext(ctx) - config, err := pgx.ParseConfig(dsn) + config, err := url.Parse(dsn) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to parse DSN: %w", err) } - noDBDSN := config.Copy() - noDBDSN.Database = "" - var conn *pgx.Conn + noDBDSN := *config + noDBDSN.Path = "" // Remove the database name. + + var conn *stdsql.DB for range 10 { - conn, err = pgx.ConnectConfig(ctx, noDBDSN) + conn, err = stdsql.Open("pgx", noDBDSN.String()) if err == nil { - defer conn.Close(ctx) + defer conn.Close() break } logger.Debugf("Waiting for database to be ready: %v", err) @@ -43,31 +46,33 @@ func CreateForDevel(ctx context.Context, dsn string, recreate bool) (*pgxpool.Po return nil, fmt.Errorf("database not ready after 10 tries: %w", err) } + dbName := strings.TrimPrefix(config.Path, "/") + if recreate { // Terminate any dangling connections. - _, err = conn.Exec(ctx, ` + _, err = conn.ExecContext(ctx, ` SELECT pid, pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname = $1 AND pid <> pg_backend_pid()`, - config.Database) + dbName) if err != nil { return nil, err } - _, err = conn.Exec(ctx, fmt.Sprintf("DROP DATABASE IF EXISTS %q", config.Database)) + _, err = conn.ExecContext(ctx, fmt.Sprintf("DROP DATABASE IF EXISTS %q", dbName)) if err != nil { return nil, err } } - _, _ = conn.Exec(ctx, fmt.Sprintf("CREATE DATABASE %q", config.Database)) //nolint:errcheck // PG doesn't support "IF NOT EXISTS" so instead we just ignore any error. + _, _ = conn.ExecContext(ctx, fmt.Sprintf("CREATE DATABASE %q", dbName)) //nolint:errcheck // PG doesn't support "IF NOT EXISTS" so instead we just ignore any error. err = sql.Migrate(ctx, dsn, log.Debug) if err != nil { return nil, err } - realConn, err := pgxpool.New(ctx, dsn) + realConn, err := stdsql.Open("pgx", dsn) if err != nil { return nil, err } @@ -75,7 +80,7 @@ func CreateForDevel(ctx context.Context, dsn string, recreate bool) (*pgxpool.Po // This includes things like resetting the state of async calls, leases, // controller/runner registration, etc. but not anything more. if !recreate { - _, err = realConn.Exec(ctx, ` + _, err = realConn.ExecContext(ctx, ` WITH deleted AS ( DELETE FROM async_calls RETURNING 1 diff --git a/backend/controller/sql/db.go b/backend/controller/sql/db.go index c4b45fb31..8a1e45d05 100644 --- a/backend/controller/sql/db.go +++ b/backend/controller/sql/db.go @@ -6,15 +6,14 @@ package sql import ( "context" - - "github.com/jackc/pgx/v5" - "github.com/jackc/pgx/v5/pgconn" + "database/sql" ) type DBTX interface { - Exec(context.Context, string, ...interface{}) (pgconn.CommandTag, error) - Query(context.Context, string, ...interface{}) (pgx.Rows, error) - QueryRow(context.Context, string, ...interface{}) pgx.Row + ExecContext(context.Context, string, ...interface{}) (sql.Result, error) + PrepareContext(context.Context, string) (*sql.Stmt, error) + QueryContext(context.Context, string, ...interface{}) (*sql.Rows, error) + QueryRowContext(context.Context, string, ...interface{}) *sql.Row } func New(db DBTX) *Queries { @@ -25,7 +24,7 @@ type Queries struct { db DBTX } -func (q *Queries) WithTx(tx pgx.Tx) *Queries { +func (q *Queries) WithTx(tx *sql.Tx) *Queries { return &Queries{ db: tx, } diff --git a/backend/controller/sql/models.go b/backend/controller/sql/models.go index e67f71b01..c5bddc7eb 100644 --- a/backend/controller/sql/models.go +++ b/backend/controller/sql/models.go @@ -11,10 +11,12 @@ import ( "time" "github.com/TBD54566975/ftl/backend/controller/leases" + "github.com/TBD54566975/ftl/backend/controller/sql/sqltypes" "github.com/TBD54566975/ftl/backend/schema" "github.com/TBD54566975/ftl/internal/model" "github.com/alecthomas/types/optional" "github.com/google/uuid" + "github.com/sqlc-dev/pqtype" ) type AsyncCallState string @@ -376,16 +378,16 @@ type AsyncCall struct { State AsyncCallState Origin string ScheduledAt time.Time - Request []byte - Response []byte + Request json.RawMessage + Response pqtype.NullRawMessage Error optional.Option[string] RemainingAttempts int32 - Backoff time.Duration - MaxBackoff time.Duration + Backoff sqltypes.Duration + MaxBackoff sqltypes.Duration CatchVerb optional.Option[schema.RefKey] Catching bool ParentRequestKey optional.Option[string] - TraceContext []byte + TraceContext pqtype.NullRawMessage } type Controller struct { @@ -415,7 +417,7 @@ type Deployment struct { ModuleID int64 Key model.DeploymentKey Schema *schema.Module - Labels []byte + Labels json.RawMessage MinReplicas int32 } @@ -467,7 +469,7 @@ type Lease struct { Key leases.Key CreatedAt time.Time ExpiresAt time.Time - Metadata []byte + Metadata pqtype.NullRawMessage } type Module struct { @@ -481,7 +483,7 @@ type ModuleConfiguration struct { CreatedAt time.Time Module optional.Option[string] Name string - Value []byte + Value json.RawMessage } type ModuleSecret struct { @@ -509,7 +511,7 @@ type Runner struct { Endpoint string ModuleName optional.Option[string] DeploymentID optional.Option[int64] - Labels []byte + Labels json.RawMessage } type Topic struct { @@ -530,7 +532,7 @@ type TopicEvent struct { Payload []byte Caller optional.Option[string] RequestKey optional.Option[string] - TraceContext []byte + TraceContext pqtype.NullRawMessage } type TopicSubscriber struct { @@ -541,8 +543,8 @@ type TopicSubscriber struct { DeploymentID int64 Sink schema.RefKey RetryAttempts int32 - Backoff time.Duration - MaxBackoff time.Duration + Backoff sqltypes.Duration + MaxBackoff sqltypes.Duration CatchVerb optional.Option[schema.RefKey] } diff --git a/backend/controller/sql/querier.go b/backend/controller/sql/querier.go index 1331f7e47..f34f4c44e 100644 --- a/backend/controller/sql/querier.go +++ b/backend/controller/sql/querier.go @@ -6,19 +6,22 @@ package sql import ( "context" + "encoding/json" "time" "github.com/TBD54566975/ftl/backend/controller/leases" + "github.com/TBD54566975/ftl/backend/controller/sql/sqltypes" "github.com/TBD54566975/ftl/backend/schema" "github.com/TBD54566975/ftl/internal/model" "github.com/alecthomas/types/optional" "github.com/google/uuid" + "github.com/sqlc-dev/pqtype" ) type Querier interface { // Reserve a pending async call for execution, returning the associated lease // reservation key and accompanying metadata. - AcquireAsyncCall(ctx context.Context, ttl time.Duration) (AcquireAsyncCallRow, error) + AcquireAsyncCall(ctx context.Context, ttl sqltypes.Duration) (AcquireAsyncCallRow, error) AssociateArtefactWithDeployment(ctx context.Context, arg AssociateArtefactWithDeploymentParams) error AsyncCallQueueDepth(ctx context.Context) (int64, error) BeginConsumingTopicEvent(ctx context.Context, subscription model.SubscriptionKey, event model.TopicEventKey) error @@ -30,7 +33,7 @@ type Querier interface { CreateDeployment(ctx context.Context, moduleName string, schema []byte, key model.DeploymentKey) error CreateIngressRoute(ctx context.Context, arg CreateIngressRouteParams) error CreateRequest(ctx context.Context, origin Origin, key model.RequestKey, sourceAddr string) error - DeleteOldEvents(ctx context.Context, timeout time.Duration, type_ EventType) (int64, error) + DeleteOldEvents(ctx context.Context, timeout sqltypes.Duration, type_ EventType) (int64, error) DeleteSubscribers(ctx context.Context, deployment model.DeploymentKey) ([]model.SubscriberKey, error) DeleteSubscriptions(ctx context.Context, deployment model.DeploymentKey) ([]model.SubscriptionKey, error) DeregisterRunner(ctx context.Context, key model.RunnerKey) (int64, error) @@ -62,12 +65,12 @@ type Querier interface { GetDeploymentsWithMinReplicas(ctx context.Context) ([]GetDeploymentsWithMinReplicasRow, error) GetExistingDeploymentForModule(ctx context.Context, name string) (GetExistingDeploymentForModuleRow, error) GetFSMInstance(ctx context.Context, fsm schema.RefKey, key string) (FsmInstance, error) - GetIdleRunners(ctx context.Context, labels []byte, limit int64) ([]Runner, error) + GetIdleRunners(ctx context.Context, labels json.RawMessage, limit int64) ([]Runner, error) // Get the runner endpoints corresponding to the given ingress route. GetIngressRoutes(ctx context.Context, method string) ([]GetIngressRoutesRow, error) GetLeaseInfo(ctx context.Context, key leases.Key) (GetLeaseInfoRow, error) GetModulesByID(ctx context.Context, ids []int64) ([]Module, error) - GetNextEventForSubscription(ctx context.Context, consumptionDelay time.Duration, topic model.TopicKey, cursor optional.Option[model.TopicEventKey]) (GetNextEventForSubscriptionRow, error) + GetNextEventForSubscription(ctx context.Context, consumptionDelay sqltypes.Duration, topic model.TopicKey, cursor optional.Option[model.TopicEventKey]) (GetNextEventForSubscriptionRow, error) GetProcessList(ctx context.Context) ([]GetProcessListRow, error) GetRandomSubscriber(ctx context.Context, key model.SubscriptionKey) (GetRandomSubscriberRow, error) // Retrieve routing information for a runner. @@ -77,7 +80,7 @@ type Querier interface { GetRunnerState(ctx context.Context, key model.RunnerKey) (RunnerState, error) GetRunnersForDeployment(ctx context.Context, key model.DeploymentKey) ([]GetRunnersForDeploymentRow, error) GetSchemaForDeployment(ctx context.Context, key model.DeploymentKey) (*schema.Module, error) - GetStaleCronJobs(ctx context.Context, dollar_1 time.Duration) ([]GetStaleCronJobsRow, error) + GetStaleCronJobs(ctx context.Context, dollar_1 sqltypes.Duration) ([]GetStaleCronJobsRow, error) GetSubscription(ctx context.Context, column1 string, column2 string) (TopicSubscription, error) // Results may not be ready to be scheduled yet due to event consumption delay // Sorting ensures that brand new events (that may not be ready for consumption) @@ -92,15 +95,15 @@ type Querier interface { InsertLogEvent(ctx context.Context, arg InsertLogEventParams) error InsertSubscriber(ctx context.Context, arg InsertSubscriberParams) error // Mark any controller entries that haven't been updated recently as dead. - KillStaleControllers(ctx context.Context, timeout time.Duration) (int64, error) - KillStaleRunners(ctx context.Context, timeout time.Duration) (int64, error) + KillStaleControllers(ctx context.Context, timeout sqltypes.Duration) (int64, error) + KillStaleRunners(ctx context.Context, timeout sqltypes.Duration) (int64, error) LoadAsyncCall(ctx context.Context, id int64) (AsyncCall, error) - NewLease(ctx context.Context, key leases.Key, ttl time.Duration, metadata []byte) (uuid.UUID, error) + NewLease(ctx context.Context, key leases.Key, ttl sqltypes.Duration, metadata pqtype.NullRawMessage) (uuid.UUID, error) PublishEventForTopic(ctx context.Context, arg PublishEventForTopicParams) error ReleaseLease(ctx context.Context, idempotencyKey uuid.UUID, key leases.Key) (bool, error) - RenewLease(ctx context.Context, ttl time.Duration, idempotencyKey uuid.UUID, key leases.Key) (bool, error) + RenewLease(ctx context.Context, ttl sqltypes.Duration, idempotencyKey uuid.UUID, key leases.Key) (bool, error) // Find an idle runner and reserve it for the given deployment. - ReserveRunner(ctx context.Context, reservationTimeout time.Time, deploymentKey model.DeploymentKey, labels []byte) (Runner, error) + ReserveRunner(ctx context.Context, reservationTimeout time.Time, deploymentKey model.DeploymentKey, labels json.RawMessage) (Runner, error) SetDeploymentDesiredReplicas(ctx context.Context, key model.DeploymentKey, minReplicas int32) error SetSubscriptionCursor(ctx context.Context, column1 model.SubscriptionKey, column2 model.TopicEventKey) error StartCronJobs(ctx context.Context, keys []string) ([]StartCronJobsRow, error) @@ -108,7 +111,7 @@ type Querier interface { // // "key" is the unique identifier for the FSM execution. StartFSMTransition(ctx context.Context, arg StartFSMTransitionParams) (FsmInstance, error) - SucceedAsyncCall(ctx context.Context, response []byte, iD int64) (bool, error) + SucceedAsyncCall(ctx context.Context, response json.RawMessage, iD int64) (bool, error) SucceedFSMInstance(ctx context.Context, fsm schema.RefKey, key string) (bool, error) UpsertController(ctx context.Context, key model.ControllerKey, endpoint string) (int64, error) UpsertModule(ctx context.Context, language string, name string) (int64, error) diff --git a/backend/controller/sql/queries.sql.go b/backend/controller/sql/queries.sql.go index a7815e118..224402327 100644 --- a/backend/controller/sql/queries.sql.go +++ b/backend/controller/sql/queries.sql.go @@ -11,10 +11,13 @@ import ( "time" "github.com/TBD54566975/ftl/backend/controller/leases" + "github.com/TBD54566975/ftl/backend/controller/sql/sqltypes" "github.com/TBD54566975/ftl/backend/schema" "github.com/TBD54566975/ftl/internal/model" "github.com/alecthomas/types/optional" "github.com/google/uuid" + "github.com/lib/pq" + "github.com/sqlc-dev/pqtype" ) const acquireAsyncCall = `-- name: AcquireAsyncCall :one @@ -64,21 +67,21 @@ type AcquireAsyncCallRow struct { Origin string Verb schema.RefKey CatchVerb optional.Option[schema.RefKey] - Request []byte + Request json.RawMessage ScheduledAt time.Time RemainingAttempts int32 Error optional.Option[string] - Backoff time.Duration - MaxBackoff time.Duration + Backoff sqltypes.Duration + MaxBackoff sqltypes.Duration ParentRequestKey optional.Option[string] - TraceContext []byte + TraceContext pqtype.NullRawMessage Catching bool } // Reserve a pending async call for execution, returning the associated lease // reservation key and accompanying metadata. -func (q *Queries) AcquireAsyncCall(ctx context.Context, ttl time.Duration) (AcquireAsyncCallRow, error) { - row := q.db.QueryRow(ctx, acquireAsyncCall, ttl) +func (q *Queries) AcquireAsyncCall(ctx context.Context, ttl sqltypes.Duration) (AcquireAsyncCallRow, error) { + row := q.db.QueryRowContext(ctx, acquireAsyncCall, ttl) var i AcquireAsyncCallRow err := row.Scan( &i.AsyncCallID, @@ -114,7 +117,7 @@ type AssociateArtefactWithDeploymentParams struct { } func (q *Queries) AssociateArtefactWithDeployment(ctx context.Context, arg AssociateArtefactWithDeploymentParams) error { - _, err := q.db.Exec(ctx, associateArtefactWithDeployment, + _, err := q.db.ExecContext(ctx, associateArtefactWithDeployment, arg.Key, arg.ArtefactID, arg.Executable, @@ -130,7 +133,7 @@ WHERE state = 'pending' AND scheduled_at <= (NOW() AT TIME ZONE 'utc') ` func (q *Queries) AsyncCallQueueDepth(ctx context.Context) (int64, error) { - row := q.db.QueryRow(ctx, asyncCallQueueDepth) + row := q.db.QueryRowContext(ctx, asyncCallQueueDepth) var count int64 err := row.Scan(&count) return count, err @@ -149,7 +152,7 @@ WHERE key = $1::subscription_key ` func (q *Queries) BeginConsumingTopicEvent(ctx context.Context, subscription model.SubscriptionKey, event model.TopicEventKey) error { - _, err := q.db.Exec(ctx, beginConsumingTopicEvent, subscription, event) + _, err := q.db.ExecContext(ctx, beginConsumingTopicEvent, subscription, event) return err } @@ -166,7 +169,7 @@ WHERE name = $1::TEXT ` func (q *Queries) CompleteEventForSubscription(ctx context.Context, name string, module string) error { - _, err := q.db.Exec(ctx, completeEventForSubscription, name, module) + _, err := q.db.ExecContext(ctx, completeEventForSubscription, name, module) return err } @@ -178,7 +181,7 @@ RETURNING id // Create a new artefact and return the artefact ID. func (q *Queries) CreateArtefact(ctx context.Context, digest []byte, content []byte) (int64, error) { - row := q.db.QueryRow(ctx, createArtefact, digest, content) + row := q.db.QueryRowContext(ctx, createArtefact, digest, content) var id int64 err := row.Scan(&id) return id, err @@ -213,17 +216,17 @@ RETURNING id type CreateAsyncCallParams struct { Verb schema.RefKey Origin string - Request []byte + Request json.RawMessage RemainingAttempts int32 - Backoff time.Duration - MaxBackoff time.Duration + Backoff sqltypes.Duration + MaxBackoff sqltypes.Duration CatchVerb optional.Option[schema.RefKey] ParentRequestKey optional.Option[string] - TraceContext []byte + TraceContext json.RawMessage } func (q *Queries) CreateAsyncCall(ctx context.Context, arg CreateAsyncCallParams) (int64, error) { - row := q.db.QueryRow(ctx, createAsyncCall, + row := q.db.QueryRowContext(ctx, createAsyncCall, arg.Verb, arg.Origin, arg.Request, @@ -262,7 +265,7 @@ type CreateCronJobParams struct { } func (q *Queries) CreateCronJob(ctx context.Context, arg CreateCronJobParams) error { - _, err := q.db.Exec(ctx, createCronJob, + _, err := q.db.ExecContext(ctx, createCronJob, arg.Key, arg.DeploymentKey, arg.ModuleName, @@ -280,7 +283,7 @@ VALUES ((SELECT id FROM modules WHERE name = $1::TEXT LIMIT 1), $2::BYTEA, $3::d ` func (q *Queries) CreateDeployment(ctx context.Context, moduleName string, schema []byte, key model.DeploymentKey) error { - _, err := q.db.Exec(ctx, createDeployment, moduleName, schema, key) + _, err := q.db.ExecContext(ctx, createDeployment, moduleName, schema, key) return err } @@ -298,7 +301,7 @@ type CreateIngressRouteParams struct { } func (q *Queries) CreateIngressRoute(ctx context.Context, arg CreateIngressRouteParams) error { - _, err := q.db.Exec(ctx, createIngressRoute, + _, err := q.db.ExecContext(ctx, createIngressRoute, arg.Key, arg.Module, arg.Verb, @@ -314,7 +317,7 @@ VALUES ($1, $2, $3) ` func (q *Queries) CreateRequest(ctx context.Context, origin Origin, key model.RequestKey, sourceAddr string) error { - _, err := q.db.Exec(ctx, createRequest, origin, key, sourceAddr) + _, err := q.db.ExecContext(ctx, createRequest, origin, key, sourceAddr) return err } @@ -329,8 +332,8 @@ SELECT COUNT(*) FROM deleted ` -func (q *Queries) DeleteOldEvents(ctx context.Context, timeout time.Duration, type_ EventType) (int64, error) { - row := q.db.QueryRow(ctx, deleteOldEvents, timeout, type_) +func (q *Queries) DeleteOldEvents(ctx context.Context, timeout sqltypes.Duration, type_ EventType) (int64, error) { + row := q.db.QueryRowContext(ctx, deleteOldEvents, timeout, type_) var count int64 err := row.Scan(&count) return count, err @@ -347,7 +350,7 @@ RETURNING topic_subscribers.key ` func (q *Queries) DeleteSubscribers(ctx context.Context, deployment model.DeploymentKey) ([]model.SubscriberKey, error) { - rows, err := q.db.Query(ctx, deleteSubscribers, deployment) + rows, err := q.db.QueryContext(ctx, deleteSubscribers, deployment) if err != nil { return nil, err } @@ -360,6 +363,9 @@ func (q *Queries) DeleteSubscribers(ctx context.Context, deployment model.Deploy } items = append(items, key) } + if err := rows.Close(); err != nil { + return nil, err + } if err := rows.Err(); err != nil { return nil, err } @@ -377,7 +383,7 @@ RETURNING topic_subscriptions.key ` func (q *Queries) DeleteSubscriptions(ctx context.Context, deployment model.DeploymentKey) ([]model.SubscriptionKey, error) { - rows, err := q.db.Query(ctx, deleteSubscriptions, deployment) + rows, err := q.db.QueryContext(ctx, deleteSubscriptions, deployment) if err != nil { return nil, err } @@ -390,6 +396,9 @@ func (q *Queries) DeleteSubscriptions(ctx context.Context, deployment model.Depl } items = append(items, key) } + if err := rows.Close(); err != nil { + return nil, err + } if err := rows.Err(); err != nil { return nil, err } @@ -408,7 +417,7 @@ FROM matches ` func (q *Queries) DeregisterRunner(ctx context.Context, key model.RunnerKey) (int64, error) { - row := q.db.QueryRow(ctx, deregisterRunner, key) + row := q.db.QueryRowContext(ctx, deregisterRunner, key) var count int64 err := row.Scan(&count) return count, err @@ -442,7 +451,7 @@ type EndCronJobRow struct { } func (q *Queries) EndCronJob(ctx context.Context, nextExecution time.Time, key model.CronJobKey, startTime time.Time) (EndCronJobRow, error) { - row := q.db.QueryRow(ctx, endCronJob, nextExecution, key, startTime) + row := q.db.QueryRowContext(ctx, endCronJob, nextExecution, key, startTime) var i EndCronJobRow err := row.Scan( &i.Key, @@ -468,7 +477,7 @@ FROM expired ` func (q *Queries) ExpireLeases(ctx context.Context) (int64, error) { - row := q.db.QueryRow(ctx, expireLeases) + row := q.db.QueryRowContext(ctx, expireLeases) var count int64 err := row.Scan(&count) return count, err @@ -488,7 +497,7 @@ FROM rows ` func (q *Queries) ExpireRunnerReservations(ctx context.Context) (int64, error) { - row := q.db.QueryRow(ctx, expireRunnerReservations) + row := q.db.QueryRowContext(ctx, expireRunnerReservations) var count int64 err := row.Scan(&count) return count, err @@ -504,7 +513,7 @@ RETURNING true ` func (q *Queries) FailAsyncCall(ctx context.Context, error string, iD int64) (bool, error) { - row := q.db.QueryRow(ctx, failAsyncCall, error, iD) + row := q.db.QueryRowContext(ctx, failAsyncCall, error, iD) var column_1 bool err := row.Scan(&column_1) return column_1, err @@ -547,8 +556,8 @@ RETURNING true type FailAsyncCallWithRetryParams struct { RemainingAttempts int32 - Backoff time.Duration - MaxBackoff time.Duration + Backoff sqltypes.Duration + MaxBackoff sqltypes.Duration ScheduledAt time.Time Catching bool OriginalError optional.Option[string] @@ -557,7 +566,7 @@ type FailAsyncCallWithRetryParams struct { } func (q *Queries) FailAsyncCallWithRetry(ctx context.Context, arg FailAsyncCallWithRetryParams) (bool, error) { - row := q.db.QueryRow(ctx, failAsyncCallWithRetry, + row := q.db.QueryRowContext(ctx, failAsyncCallWithRetry, arg.RemainingAttempts, arg.Backoff, arg.MaxBackoff, @@ -585,7 +594,7 @@ RETURNING true ` func (q *Queries) FailFSMInstance(ctx context.Context, fsm schema.RefKey, key string) (bool, error) { - row := q.db.QueryRow(ctx, failFSMInstance, fsm, key) + row := q.db.QueryRowContext(ctx, failFSMInstance, fsm, key) var column_1 bool err := row.Scan(&column_1) return column_1, err @@ -605,7 +614,7 @@ RETURNING true // Mark an FSM transition as completed, updating the current state and clearing the async call ID. func (q *Queries) FinishFSMTransition(ctx context.Context, fsm schema.RefKey, key string) (bool, error) { - row := q.db.QueryRow(ctx, finishFSMTransition, fsm, key) + row := q.db.QueryRowContext(ctx, finishFSMTransition, fsm, key) var column_1 bool err := row.Scan(&column_1) return column_1, err @@ -619,7 +628,7 @@ ORDER BY c.key ` func (q *Queries) GetActiveControllers(ctx context.Context) ([]Controller, error) { - rows, err := q.db.Query(ctx, getActiveControllers) + rows, err := q.db.QueryContext(ctx, getActiveControllers) if err != nil { return nil, err } @@ -639,6 +648,9 @@ func (q *Queries) GetActiveControllers(ctx context.Context) ([]Controller, error } items = append(items, i) } + if err := rows.Close(); err != nil { + return nil, err + } if err := rows.Err(); err != nil { return nil, err } @@ -655,7 +667,7 @@ type GetActiveDeploymentSchemasRow struct { } func (q *Queries) GetActiveDeploymentSchemas(ctx context.Context) ([]GetActiveDeploymentSchemasRow, error) { - rows, err := q.db.Query(ctx, getActiveDeploymentSchemas) + rows, err := q.db.QueryContext(ctx, getActiveDeploymentSchemas) if err != nil { return nil, err } @@ -668,6 +680,9 @@ func (q *Queries) GetActiveDeploymentSchemas(ctx context.Context) ([]GetActiveDe } items = append(items, i) } + if err := rows.Close(); err != nil { + return nil, err + } if err := rows.Err(); err != nil { return nil, err } @@ -692,7 +707,7 @@ type GetActiveDeploymentsRow struct { } func (q *Queries) GetActiveDeployments(ctx context.Context) ([]GetActiveDeploymentsRow, error) { - rows, err := q.db.Query(ctx, getActiveDeployments) + rows, err := q.db.QueryContext(ctx, getActiveDeployments) if err != nil { return nil, err } @@ -716,6 +731,9 @@ func (q *Queries) GetActiveDeployments(ctx context.Context) ([]GetActiveDeployme } items = append(items, i) } + if err := rows.Close(); err != nil { + return nil, err + } if err := rows.Err(); err != nil { return nil, err } @@ -738,7 +756,7 @@ type GetActiveIngressRoutesRow struct { } func (q *Queries) GetActiveIngressRoutes(ctx context.Context) ([]GetActiveIngressRoutesRow, error) { - rows, err := q.db.Query(ctx, getActiveIngressRoutes) + rows, err := q.db.QueryContext(ctx, getActiveIngressRoutes) if err != nil { return nil, err } @@ -757,6 +775,9 @@ func (q *Queries) GetActiveIngressRoutes(ctx context.Context) ([]GetActiveIngres } items = append(items, i) } + if err := rows.Close(); err != nil { + return nil, err + } if err := rows.Err(); err != nil { return nil, err } @@ -783,14 +804,14 @@ type GetActiveRunnersRow struct { RunnerKey model.RunnerKey Endpoint string State RunnerState - Labels []byte + Labels json.RawMessage LastSeen time.Time ModuleName optional.Option[string] DeploymentKey optional.Option[string] } func (q *Queries) GetActiveRunners(ctx context.Context) ([]GetActiveRunnersRow, error) { - rows, err := q.db.Query(ctx, getActiveRunners) + rows, err := q.db.QueryContext(ctx, getActiveRunners) if err != nil { return nil, err } @@ -811,6 +832,9 @@ func (q *Queries) GetActiveRunners(ctx context.Context) ([]GetActiveRunnersRow, } items = append(items, i) } + if err := rows.Close(); err != nil { + return nil, err + } if err := rows.Err(); err != nil { return nil, err } @@ -824,7 +848,7 @@ WHERE a.id = $3 ` func (q *Queries) GetArtefactContentRange(ctx context.Context, start int32, count int32, iD int64) ([]byte, error) { - row := q.db.QueryRow(ctx, getArtefactContentRange, start, count, iD) + row := q.db.QueryRowContext(ctx, getArtefactContentRange, start, count, iD) var content []byte err := row.Scan(&content) return content, err @@ -843,7 +867,7 @@ type GetArtefactDigestsRow struct { // Return the digests that exist in the database. func (q *Queries) GetArtefactDigests(ctx context.Context, digests [][]byte) ([]GetArtefactDigestsRow, error) { - rows, err := q.db.Query(ctx, getArtefactDigests, digests) + rows, err := q.db.QueryContext(ctx, getArtefactDigests, pq.Array(digests)) if err != nil { return nil, err } @@ -856,6 +880,9 @@ func (q *Queries) GetArtefactDigests(ctx context.Context, digests [][]byte) ([]G } items = append(items, i) } + if err := rows.Close(); err != nil { + return nil, err + } if err := rows.Err(); err != nil { return nil, err } @@ -881,7 +908,7 @@ type GetCronJobsRow struct { } func (q *Queries) GetCronJobs(ctx context.Context) ([]GetCronJobsRow, error) { - rows, err := q.db.Query(ctx, getCronJobs) + rows, err := q.db.QueryContext(ctx, getCronJobs) if err != nil { return nil, err } @@ -903,6 +930,9 @@ func (q *Queries) GetCronJobs(ctx context.Context) ([]GetCronJobsRow, error) { } items = append(items, i) } + if err := rows.Close(); err != nil { + return nil, err + } if err := rows.Err(); err != nil { return nil, err } @@ -924,7 +954,7 @@ type GetDeploymentRow struct { } func (q *Queries) GetDeployment(ctx context.Context, key model.DeploymentKey) (GetDeploymentRow, error) { - row := q.db.QueryRow(ctx, getDeployment, key) + row := q.db.QueryRowContext(ctx, getDeployment, key) var i GetDeploymentRow err := row.Scan( &i.Deployment.ID, @@ -959,7 +989,7 @@ type GetDeploymentArtefactsRow struct { // Get all artefacts matching the given digests. func (q *Queries) GetDeploymentArtefacts(ctx context.Context, deploymentID int64) ([]GetDeploymentArtefactsRow, error) { - rows, err := q.db.Query(ctx, getDeploymentArtefacts, deploymentID) + rows, err := q.db.QueryContext(ctx, getDeploymentArtefacts, deploymentID) if err != nil { return nil, err } @@ -979,6 +1009,9 @@ func (q *Queries) GetDeploymentArtefacts(ctx context.Context, deploymentID int64 } items = append(items, i) } + if err := rows.Close(); err != nil { + return nil, err + } if err := rows.Err(); err != nil { return nil, err } @@ -992,7 +1025,7 @@ WHERE id = ANY ($1::BIGINT[]) ` func (q *Queries) GetDeploymentsByID(ctx context.Context, ids []int64) ([]Deployment, error) { - rows, err := q.db.Query(ctx, getDeploymentsByID, ids) + rows, err := q.db.QueryContext(ctx, getDeploymentsByID, pq.Array(ids)) if err != nil { return nil, err } @@ -1013,6 +1046,9 @@ func (q *Queries) GetDeploymentsByID(ctx context.Context, ids []int64) ([]Deploy } items = append(items, i) } + if err := rows.Close(); err != nil { + return nil, err + } if err := rows.Err(); err != nil { return nil, err } @@ -1042,7 +1078,7 @@ type GetDeploymentsNeedingReconciliationRow struct { // Get deployments that have a mismatch between the number of assigned and required replicas. func (q *Queries) GetDeploymentsNeedingReconciliation(ctx context.Context) ([]GetDeploymentsNeedingReconciliationRow, error) { - rows, err := q.db.Query(ctx, getDeploymentsNeedingReconciliation) + rows, err := q.db.QueryContext(ctx, getDeploymentsNeedingReconciliation) if err != nil { return nil, err } @@ -1061,6 +1097,9 @@ func (q *Queries) GetDeploymentsNeedingReconciliation(ctx context.Context) ([]Ge } items = append(items, i) } + if err := rows.Close(); err != nil { + return nil, err + } if err := rows.Err(); err != nil { return nil, err } @@ -1091,7 +1130,7 @@ type GetDeploymentsWithArtefactsRow struct { // Get all deployments that have artefacts matching the given digests. func (q *Queries) GetDeploymentsWithArtefacts(ctx context.Context, digests [][]byte, schema []byte, count int64) ([]GetDeploymentsWithArtefactsRow, error) { - rows, err := q.db.Query(ctx, getDeploymentsWithArtefacts, digests, schema, count) + rows, err := q.db.QueryContext(ctx, getDeploymentsWithArtefacts, pq.Array(digests), schema, count) if err != nil { return nil, err } @@ -1110,6 +1149,9 @@ func (q *Queries) GetDeploymentsWithArtefacts(ctx context.Context, digests [][]b } items = append(items, i) } + if err := rows.Close(); err != nil { + return nil, err + } if err := rows.Err(); err != nil { return nil, err } @@ -1131,7 +1173,7 @@ type GetDeploymentsWithMinReplicasRow struct { } func (q *Queries) GetDeploymentsWithMinReplicas(ctx context.Context) ([]GetDeploymentsWithMinReplicasRow, error) { - rows, err := q.db.Query(ctx, getDeploymentsWithMinReplicas) + rows, err := q.db.QueryContext(ctx, getDeploymentsWithMinReplicas) if err != nil { return nil, err } @@ -1154,6 +1196,9 @@ func (q *Queries) GetDeploymentsWithMinReplicas(ctx context.Context) ([]GetDeplo } items = append(items, i) } + if err := rows.Close(); err != nil { + return nil, err + } if err := rows.Err(); err != nil { return nil, err } @@ -1175,7 +1220,7 @@ type GetExistingDeploymentForModuleRow struct { ModuleID int64 Key model.DeploymentKey Schema *schema.Module - Labels []byte + Labels json.RawMessage MinReplicas int32 ID_2 int64 Language string @@ -1183,7 +1228,7 @@ type GetExistingDeploymentForModuleRow struct { } func (q *Queries) GetExistingDeploymentForModule(ctx context.Context, name string) (GetExistingDeploymentForModuleRow, error) { - row := q.db.QueryRow(ctx, getExistingDeploymentForModule, name) + row := q.db.QueryRowContext(ctx, getExistingDeploymentForModule, name) var i GetExistingDeploymentForModuleRow err := row.Scan( &i.ID, @@ -1207,7 +1252,7 @@ WHERE fsm = $1::schema_ref AND key = $2 ` func (q *Queries) GetFSMInstance(ctx context.Context, fsm schema.RefKey, key string) (FsmInstance, error) { - row := q.db.QueryRow(ctx, getFSMInstance, fsm, key) + row := q.db.QueryRowContext(ctx, getFSMInstance, fsm, key) var i FsmInstance err := row.Scan( &i.ID, @@ -1231,8 +1276,8 @@ WHERE labels @> $1::jsonb LIMIT $2 ` -func (q *Queries) GetIdleRunners(ctx context.Context, labels []byte, limit int64) ([]Runner, error) { - rows, err := q.db.Query(ctx, getIdleRunners, labels, limit) +func (q *Queries) GetIdleRunners(ctx context.Context, labels json.RawMessage, limit int64) ([]Runner, error) { + rows, err := q.db.QueryContext(ctx, getIdleRunners, labels, limit) if err != nil { return nil, err } @@ -1256,6 +1301,9 @@ func (q *Queries) GetIdleRunners(ctx context.Context, labels []byte, limit int64 } items = append(items, i) } + if err := rows.Close(); err != nil { + return nil, err + } if err := rows.Err(); err != nil { return nil, err } @@ -1282,7 +1330,7 @@ type GetIngressRoutesRow struct { // Get the runner endpoints corresponding to the given ingress route. func (q *Queries) GetIngressRoutes(ctx context.Context, method string) ([]GetIngressRoutesRow, error) { - rows, err := q.db.Query(ctx, getIngressRoutes, method) + rows, err := q.db.QueryContext(ctx, getIngressRoutes, method) if err != nil { return nil, err } @@ -1302,6 +1350,9 @@ func (q *Queries) GetIngressRoutes(ctx context.Context, method string) ([]GetIng } items = append(items, i) } + if err := rows.Close(); err != nil { + return nil, err + } if err := rows.Err(); err != nil { return nil, err } @@ -1314,11 +1365,11 @@ SELECT expires_at, metadata FROM leases WHERE key = $1::lease_key type GetLeaseInfoRow struct { ExpiresAt time.Time - Metadata []byte + Metadata pqtype.NullRawMessage } func (q *Queries) GetLeaseInfo(ctx context.Context, key leases.Key) (GetLeaseInfoRow, error) { - row := q.db.QueryRow(ctx, getLeaseInfo, key) + row := q.db.QueryRowContext(ctx, getLeaseInfo, key) var i GetLeaseInfoRow err := row.Scan(&i.ExpiresAt, &i.Metadata) return i, err @@ -1331,7 +1382,7 @@ WHERE id = ANY ($1::BIGINT[]) ` func (q *Queries) GetModulesByID(ctx context.Context, ids []int64) ([]Module, error) { - rows, err := q.db.Query(ctx, getModulesByID, ids) + rows, err := q.db.QueryContext(ctx, getModulesByID, pq.Array(ids)) if err != nil { return nil, err } @@ -1344,6 +1395,9 @@ func (q *Queries) GetModulesByID(ctx context.Context, ids []int64) ([]Module, er } items = append(items, i) } + if err := rows.Close(); err != nil { + return nil, err + } if err := rows.Err(); err != nil { return nil, err } @@ -1379,12 +1433,12 @@ type GetNextEventForSubscriptionRow struct { CreatedAt optional.Option[time.Time] Caller optional.Option[string] RequestKey optional.Option[string] - TraceContext []byte + TraceContext pqtype.NullRawMessage Ready bool } -func (q *Queries) GetNextEventForSubscription(ctx context.Context, consumptionDelay time.Duration, topic model.TopicKey, cursor optional.Option[model.TopicEventKey]) (GetNextEventForSubscriptionRow, error) { - row := q.db.QueryRow(ctx, getNextEventForSubscription, consumptionDelay, topic, cursor) +func (q *Queries) GetNextEventForSubscription(ctx context.Context, consumptionDelay sqltypes.Duration, topic model.TopicKey, cursor optional.Option[model.TopicEventKey]) (GetNextEventForSubscriptionRow, error) { + row := q.db.QueryRowContext(ctx, getNextEventForSubscription, consumptionDelay, topic, cursor) var i GetNextEventForSubscriptionRow err := row.Scan( &i.Event, @@ -1414,14 +1468,14 @@ ORDER BY d.key type GetProcessListRow struct { MinReplicas int32 DeploymentKey model.DeploymentKey - DeploymentLabels []byte + DeploymentLabels json.RawMessage RunnerKey optional.Option[model.RunnerKey] Endpoint optional.Option[string] - RunnerLabels []byte + RunnerLabels pqtype.NullRawMessage } func (q *Queries) GetProcessList(ctx context.Context) ([]GetProcessListRow, error) { - rows, err := q.db.Query(ctx, getProcessList) + rows, err := q.db.QueryContext(ctx, getProcessList) if err != nil { return nil, err } @@ -1441,6 +1495,9 @@ func (q *Queries) GetProcessList(ctx context.Context) ([]GetProcessListRow, erro } items = append(items, i) } + if err := rows.Close(); err != nil { + return nil, err + } if err := rows.Err(); err != nil { return nil, err } @@ -1464,13 +1521,13 @@ LIMIT 1 type GetRandomSubscriberRow struct { Sink schema.RefKey RetryAttempts int32 - Backoff time.Duration - MaxBackoff time.Duration + Backoff sqltypes.Duration + MaxBackoff sqltypes.Duration CatchVerb optional.Option[schema.RefKey] } func (q *Queries) GetRandomSubscriber(ctx context.Context, key model.SubscriptionKey) (GetRandomSubscriberRow, error) { - row := q.db.QueryRow(ctx, getRandomSubscriber, key) + row := q.db.QueryRowContext(ctx, getRandomSubscriber, key) var i GetRandomSubscriberRow err := row.Scan( &i.Sink, @@ -1499,7 +1556,7 @@ type GetRouteForRunnerRow struct { // Retrieve routing information for a runner. func (q *Queries) GetRouteForRunner(ctx context.Context, key model.RunnerKey) (GetRouteForRunnerRow, error) { - row := q.db.QueryRow(ctx, getRouteForRunner, key) + row := q.db.QueryRowContext(ctx, getRouteForRunner, key) var i GetRouteForRunnerRow err := row.Scan( &i.Endpoint, @@ -1528,7 +1585,7 @@ type GetRoutingTableRow struct { } func (q *Queries) GetRoutingTable(ctx context.Context, modules []string) ([]GetRoutingTableRow, error) { - rows, err := q.db.Query(ctx, getRoutingTable, modules) + rows, err := q.db.QueryContext(ctx, getRoutingTable, pq.Array(modules)) if err != nil { return nil, err } @@ -1546,6 +1603,9 @@ func (q *Queries) GetRoutingTable(ctx context.Context, modules []string) ([]GetR } items = append(items, i) } + if err := rows.Close(); err != nil { + return nil, err + } if err := rows.Err(); err != nil { return nil, err } @@ -1571,14 +1631,14 @@ type GetRunnerRow struct { RunnerKey model.RunnerKey Endpoint string State RunnerState - Labels []byte + Labels json.RawMessage LastSeen time.Time ModuleName optional.Option[string] DeploymentKey optional.Option[string] } func (q *Queries) GetRunner(ctx context.Context, key model.RunnerKey) (GetRunnerRow, error) { - row := q.db.QueryRow(ctx, getRunner, key) + row := q.db.QueryRowContext(ctx, getRunner, key) var i GetRunnerRow err := row.Scan( &i.RunnerKey, @@ -1599,7 +1659,7 @@ WHERE key = $1::runner_key ` func (q *Queries) GetRunnerState(ctx context.Context, key model.RunnerKey) (RunnerState, error) { - row := q.db.QueryRow(ctx, getRunnerState, key) + row := q.db.QueryRowContext(ctx, getRunnerState, key) var state RunnerState err := row.Scan(&state) return state, err @@ -1623,18 +1683,18 @@ type GetRunnersForDeploymentRow struct { Endpoint string ModuleName optional.Option[string] DeploymentID optional.Option[int64] - Labels []byte + Labels json.RawMessage ID_2 int64 CreatedAt time.Time ModuleID int64 Key_2 model.DeploymentKey Schema *schema.Module - Labels_2 []byte + Labels_2 json.RawMessage MinReplicas int32 } func (q *Queries) GetRunnersForDeployment(ctx context.Context, key model.DeploymentKey) ([]GetRunnersForDeploymentRow, error) { - rows, err := q.db.Query(ctx, getRunnersForDeployment, key) + rows, err := q.db.QueryContext(ctx, getRunnersForDeployment, key) if err != nil { return nil, err } @@ -1665,6 +1725,9 @@ func (q *Queries) GetRunnersForDeployment(ctx context.Context, key model.Deploym } items = append(items, i) } + if err := rows.Close(); err != nil { + return nil, err + } if err := rows.Err(); err != nil { return nil, err } @@ -1676,7 +1739,7 @@ SELECT schema FROM deployments WHERE key = $1::deployment_key ` func (q *Queries) GetSchemaForDeployment(ctx context.Context, key model.DeploymentKey) (*schema.Module, error) { - row := q.db.QueryRow(ctx, getSchemaForDeployment, key) + row := q.db.QueryRowContext(ctx, getSchemaForDeployment, key) var schema *schema.Module err := row.Scan(&schema) return schema, err @@ -1701,8 +1764,8 @@ type GetStaleCronJobsRow struct { State model.CronJobState } -func (q *Queries) GetStaleCronJobs(ctx context.Context, dollar_1 time.Duration) ([]GetStaleCronJobsRow, error) { - rows, err := q.db.Query(ctx, getStaleCronJobs, dollar_1) +func (q *Queries) GetStaleCronJobs(ctx context.Context, dollar_1 sqltypes.Duration) ([]GetStaleCronJobsRow, error) { + rows, err := q.db.QueryContext(ctx, getStaleCronJobs, dollar_1) if err != nil { return nil, err } @@ -1724,6 +1787,9 @@ func (q *Queries) GetStaleCronJobs(ctx context.Context, dollar_1 time.Duration) } items = append(items, i) } + if err := rows.Close(); err != nil { + return nil, err + } if err := rows.Err(); err != nil { return nil, err } @@ -1743,7 +1809,7 @@ WHERE name = $1::TEXT ` func (q *Queries) GetSubscription(ctx context.Context, column1 string, column2 string) (TopicSubscription, error) { - row := q.db.QueryRow(ctx, getSubscription, column1, column2) + row := q.db.QueryRowContext(ctx, getSubscription, column1, column2) var i TopicSubscription err := row.Scan( &i.ID, @@ -1786,7 +1852,7 @@ type GetSubscriptionsNeedingUpdateRow struct { // Sorting ensures that brand new events (that may not be ready for consumption) // don't prevent older events from being consumed func (q *Queries) GetSubscriptionsNeedingUpdate(ctx context.Context) ([]GetSubscriptionsNeedingUpdateRow, error) { - rows, err := q.db.Query(ctx, getSubscriptionsNeedingUpdate) + rows, err := q.db.QueryContext(ctx, getSubscriptionsNeedingUpdate) if err != nil { return nil, err } @@ -1804,6 +1870,9 @@ func (q *Queries) GetSubscriptionsNeedingUpdate(ctx context.Context) ([]GetSubsc } items = append(items, i) } + if err := rows.Close(); err != nil { + return nil, err + } if err := rows.Err(); err != nil { return nil, err } @@ -1817,7 +1886,7 @@ WHERE id = $1::BIGINT ` func (q *Queries) GetTopic(ctx context.Context, dollar_1 int64) (Topic, error) { - row := q.db.QueryRow(ctx, getTopic, dollar_1) + row := q.db.QueryRowContext(ctx, getTopic, dollar_1) var i Topic err := row.Scan( &i.ID, @@ -1838,7 +1907,7 @@ WHERE id = $1::BIGINT ` func (q *Queries) GetTopicEvent(ctx context.Context, dollar_1 int64) (TopicEvent, error) { - row := q.db.QueryRow(ctx, getTopicEvent, dollar_1) + row := q.db.QueryRowContext(ctx, getTopicEvent, dollar_1) var i TopicEvent err := row.Scan( &i.ID, @@ -1899,7 +1968,7 @@ type InsertCallEventParams struct { } func (q *Queries) InsertCallEvent(ctx context.Context, arg InsertCallEventParams) error { - _, err := q.db.Exec(ctx, insertCallEvent, + _, err := q.db.ExecContext(ctx, insertCallEvent, arg.DeploymentKey, arg.RequestKey, arg.ParentRequestKey, @@ -1942,7 +2011,7 @@ type InsertDeploymentCreatedEventParams struct { } func (q *Queries) InsertDeploymentCreatedEvent(ctx context.Context, arg InsertDeploymentCreatedEventParams) error { - _, err := q.db.Exec(ctx, insertDeploymentCreatedEvent, + _, err := q.db.ExecContext(ctx, insertDeploymentCreatedEvent, arg.DeploymentKey, arg.Language, arg.ModuleName, @@ -1980,7 +2049,7 @@ type InsertDeploymentUpdatedEventParams struct { } func (q *Queries) InsertDeploymentUpdatedEvent(ctx context.Context, arg InsertDeploymentUpdatedEventParams) error { - _, err := q.db.Exec(ctx, insertDeploymentUpdatedEvent, + _, err := q.db.ExecContext(ctx, insertDeploymentUpdatedEvent, arg.DeploymentKey, arg.Language, arg.ModuleName, @@ -2010,7 +2079,7 @@ type InsertEventParams struct { } func (q *Queries) InsertEvent(ctx context.Context, arg InsertEventParams) error { - _, err := q.db.Exec(ctx, insertEvent, + _, err := q.db.ExecContext(ctx, insertEvent, arg.DeploymentID, arg.RequestID, arg.ParentRequestID, @@ -2057,7 +2126,7 @@ type InsertLogEventParams struct { } func (q *Queries) InsertLogEvent(ctx context.Context, arg InsertLogEventParams) error { - _, err := q.db.Exec(ctx, insertLogEvent, + _, err := q.db.ExecContext(ctx, insertLogEvent, arg.DeploymentKey, arg.RequestKey, arg.TimeStamp, @@ -2103,13 +2172,13 @@ type InsertSubscriberParams struct { Deployment model.DeploymentKey Sink schema.RefKey RetryAttempts int32 - Backoff time.Duration - MaxBackoff time.Duration + Backoff sqltypes.Duration + MaxBackoff sqltypes.Duration CatchVerb optional.Option[schema.RefKey] } func (q *Queries) InsertSubscriber(ctx context.Context, arg InsertSubscriberParams) error { - _, err := q.db.Exec(ctx, insertSubscriber, + _, err := q.db.ExecContext(ctx, insertSubscriber, arg.Key, arg.Module, arg.SubscriptionName, @@ -2134,8 +2203,8 @@ FROM matches ` // Mark any controller entries that haven't been updated recently as dead. -func (q *Queries) KillStaleControllers(ctx context.Context, timeout time.Duration) (int64, error) { - row := q.db.QueryRow(ctx, killStaleControllers, timeout) +func (q *Queries) KillStaleControllers(ctx context.Context, timeout sqltypes.Duration) (int64, error) { + row := q.db.QueryRowContext(ctx, killStaleControllers, timeout) var count int64 err := row.Scan(&count) return count, err @@ -2152,8 +2221,8 @@ SELECT COUNT(*) FROM matches ` -func (q *Queries) KillStaleRunners(ctx context.Context, timeout time.Duration) (int64, error) { - row := q.db.QueryRow(ctx, killStaleRunners, timeout) +func (q *Queries) KillStaleRunners(ctx context.Context, timeout sqltypes.Duration) (int64, error) { + row := q.db.QueryRowContext(ctx, killStaleRunners, timeout) var count int64 err := row.Scan(&count) return count, err @@ -2166,7 +2235,7 @@ WHERE id = $1 ` func (q *Queries) LoadAsyncCall(ctx context.Context, id int64) (AsyncCall, error) { - row := q.db.QueryRow(ctx, loadAsyncCall, id) + row := q.db.QueryRowContext(ctx, loadAsyncCall, id) var i AsyncCall err := row.Scan( &i.ID, @@ -2206,8 +2275,8 @@ VALUES ( RETURNING idempotency_key ` -func (q *Queries) NewLease(ctx context.Context, key leases.Key, ttl time.Duration, metadata []byte) (uuid.UUID, error) { - row := q.db.QueryRow(ctx, newLease, key, ttl, metadata) +func (q *Queries) NewLease(ctx context.Context, key leases.Key, ttl sqltypes.Duration, metadata pqtype.NullRawMessage) (uuid.UUID, error) { + row := q.db.QueryRowContext(ctx, newLease, key, ttl, metadata) var idempotency_key uuid.UUID err := row.Scan(&idempotency_key) return idempotency_key, err @@ -2245,11 +2314,11 @@ type PublishEventForTopicParams struct { Caller string Payload []byte RequestKey string - TraceContext []byte + TraceContext json.RawMessage } func (q *Queries) PublishEventForTopic(ctx context.Context, arg PublishEventForTopicParams) error { - _, err := q.db.Exec(ctx, publishEventForTopic, + _, err := q.db.ExecContext(ctx, publishEventForTopic, arg.Key, arg.Module, arg.Topic, @@ -2268,7 +2337,7 @@ RETURNING true ` func (q *Queries) ReleaseLease(ctx context.Context, idempotencyKey uuid.UUID, key leases.Key) (bool, error) { - row := q.db.QueryRow(ctx, releaseLease, idempotencyKey, key) + row := q.db.QueryRowContext(ctx, releaseLease, idempotencyKey, key) var column_1 bool err := row.Scan(&column_1) return column_1, err @@ -2281,8 +2350,8 @@ WHERE idempotency_key = $2 AND key = $3::lease_key RETURNING true ` -func (q *Queries) RenewLease(ctx context.Context, ttl time.Duration, idempotencyKey uuid.UUID, key leases.Key) (bool, error) { - row := q.db.QueryRow(ctx, renewLease, ttl, idempotencyKey, key) +func (q *Queries) RenewLease(ctx context.Context, ttl sqltypes.Duration, idempotencyKey uuid.UUID, key leases.Key) (bool, error) { + row := q.db.QueryRowContext(ctx, renewLease, ttl, idempotencyKey, key) var column_1 bool err := row.Scan(&column_1) return column_1, err @@ -2307,8 +2376,8 @@ RETURNING runners.id, runners.key, runners.created, runners.last_seen, runners.r ` // Find an idle runner and reserve it for the given deployment. -func (q *Queries) ReserveRunner(ctx context.Context, reservationTimeout time.Time, deploymentKey model.DeploymentKey, labels []byte) (Runner, error) { - row := q.db.QueryRow(ctx, reserveRunner, reservationTimeout, deploymentKey, labels) +func (q *Queries) ReserveRunner(ctx context.Context, reservationTimeout time.Time, deploymentKey model.DeploymentKey, labels json.RawMessage) (Runner, error) { + row := q.db.QueryRowContext(ctx, reserveRunner, reservationTimeout, deploymentKey, labels) var i Runner err := row.Scan( &i.ID, @@ -2333,7 +2402,7 @@ RETURNING 1 ` func (q *Queries) SetDeploymentDesiredReplicas(ctx context.Context, key model.DeploymentKey, minReplicas int32) error { - _, err := q.db.Exec(ctx, setDeploymentDesiredReplicas, key, minReplicas) + _, err := q.db.ExecContext(ctx, setDeploymentDesiredReplicas, key, minReplicas) return err } @@ -2350,7 +2419,7 @@ WHERE key = $1::subscription_key ` func (q *Queries) SetSubscriptionCursor(ctx context.Context, column1 model.SubscriptionKey, column2 model.TopicEventKey) error { - _, err := q.db.Exec(ctx, setSubscriptionCursor, column1, column2) + _, err := q.db.ExecContext(ctx, setSubscriptionCursor, column1, column2) return err } @@ -2390,7 +2459,7 @@ type StartCronJobsRow struct { } func (q *Queries) StartCronJobs(ctx context.Context, keys []string) ([]StartCronJobsRow, error) { - rows, err := q.db.Query(ctx, startCronJobs, keys) + rows, err := q.db.QueryContext(ctx, startCronJobs, pq.Array(keys)) if err != nil { return nil, err } @@ -2414,6 +2483,9 @@ func (q *Queries) StartCronJobs(ctx context.Context, keys []string) ([]StartCron } items = append(items, i) } + if err := rows.Close(); err != nil { + return nil, err + } if err := rows.Err(); err != nil { return nil, err } @@ -2454,7 +2526,7 @@ type StartFSMTransitionParams struct { // // "key" is the unique identifier for the FSM execution. func (q *Queries) StartFSMTransition(ctx context.Context, arg StartFSMTransitionParams) (FsmInstance, error) { - row := q.db.QueryRow(ctx, startFSMTransition, + row := q.db.QueryRowContext(ctx, startFSMTransition, arg.Fsm, arg.Key, arg.DestinationState, @@ -2485,8 +2557,8 @@ WHERE id = $2 RETURNING true ` -func (q *Queries) SucceedAsyncCall(ctx context.Context, response []byte, iD int64) (bool, error) { - row := q.db.QueryRow(ctx, succeedAsyncCall, response, iD) +func (q *Queries) SucceedAsyncCall(ctx context.Context, response json.RawMessage, iD int64) (bool, error) { + row := q.db.QueryRowContext(ctx, succeedAsyncCall, response, iD) var column_1 bool err := row.Scan(&column_1) return column_1, err @@ -2506,7 +2578,7 @@ RETURNING true ` func (q *Queries) SucceedFSMInstance(ctx context.Context, fsm schema.RefKey, key string) (bool, error) { - row := q.db.QueryRow(ctx, succeedFSMInstance, fsm, key) + row := q.db.QueryRowContext(ctx, succeedFSMInstance, fsm, key) var column_1 bool err := row.Scan(&column_1) return column_1, err @@ -2522,7 +2594,7 @@ RETURNING id ` func (q *Queries) UpsertController(ctx context.Context, key model.ControllerKey, endpoint string) (int64, error) { - row := q.db.QueryRow(ctx, upsertController, key, endpoint) + row := q.db.QueryRowContext(ctx, upsertController, key, endpoint) var id int64 err := row.Scan(&id) return id, err @@ -2536,7 +2608,7 @@ RETURNING id ` func (q *Queries) UpsertModule(ctx context.Context, language string, name string) (int64, error) { - row := q.db.QueryRow(ctx, upsertModule, language, name) + row := q.db.QueryRowContext(ctx, upsertModule, language, name) var id int64 err := row.Scan(&id) return id, err @@ -2571,7 +2643,7 @@ type UpsertRunnerParams struct { Key model.RunnerKey Endpoint string State RunnerState - Labels []byte + Labels json.RawMessage DeploymentKey optional.Option[model.DeploymentKey] } @@ -2581,7 +2653,7 @@ type UpsertRunnerParams struct { // there is no corresponding deployment, then the deployment ID is -1 // and the parent statement will fail due to a foreign key constraint. func (q *Queries) UpsertRunner(ctx context.Context, arg UpsertRunnerParams) (optional.Option[int64], error) { - row := q.db.QueryRow(ctx, upsertRunner, + row := q.db.QueryRowContext(ctx, upsertRunner, arg.Key, arg.Endpoint, arg.State, @@ -2640,7 +2712,7 @@ type UpsertSubscriptionRow struct { } func (q *Queries) UpsertSubscription(ctx context.Context, arg UpsertSubscriptionParams) (UpsertSubscriptionRow, error) { - row := q.db.QueryRow(ctx, upsertSubscription, + row := q.db.QueryRowContext(ctx, upsertSubscription, arg.Key, arg.TopicModule, arg.TopicName, @@ -2675,7 +2747,7 @@ type UpsertTopicParams struct { } func (q *Queries) UpsertTopic(ctx context.Context, arg UpsertTopicParams) error { - _, err := q.db.Exec(ctx, upsertTopic, + _, err := q.db.ExecContext(ctx, upsertTopic, arg.Topic, arg.Module, arg.Name, diff --git a/backend/controller/sql/sqltest/testing.go b/backend/controller/sql/sqltest/testing.go index 0202d62f8..16e36c2b9 100644 --- a/backend/controller/sql/sqltest/testing.go +++ b/backend/controller/sql/sqltest/testing.go @@ -2,13 +2,13 @@ package sqltest import ( "context" + "database/sql" "os" "path/filepath" "testing" "time" "github.com/alecthomas/assert/v2" - "github.com/jackc/pgx/v5/pgxpool" "github.com/TBD54566975/ftl/backend/controller/sql/databasetesting" "github.com/TBD54566975/ftl/internal/flock" @@ -16,7 +16,7 @@ import ( // OpenForTesting opens a database connection for testing, recreating the // database beforehand. -func OpenForTesting(ctx context.Context, t testing.TB) *pgxpool.Pool { +func OpenForTesting(ctx context.Context, t testing.TB) *sql.DB { t.Helper() // Acquire lock for this DB. lockPath := filepath.Join(os.TempDir(), "ftl-db-test.lock") diff --git a/backend/controller/sql/sqltypes/sqltypes.go b/backend/controller/sql/sqltypes/sqltypes.go new file mode 100644 index 000000000..4fd49d120 --- /dev/null +++ b/backend/controller/sql/sqltypes/sqltypes.go @@ -0,0 +1,32 @@ +package sqltypes + +import ( + "database/sql/driver" + "fmt" + "strings" + "time" +) + +type Duration time.Duration + +func (d Duration) Value() (driver.Value, error) { + return time.Duration(d).String(), nil +} + +func (d *Duration) Scan(value interface{}) error { + switch v := value.(type) { + case string: + // Convert format of hh:mm:ss into format parseable by time.ParseDuration() + v = strings.Replace(v, ":", "h", 1) + v = strings.Replace(v, ":", "m", 1) + v += "s" + dur, err := time.ParseDuration(v) + if err != nil { + return fmt.Errorf("failed to parse duration %q: %w", v, err) + } + *d = Duration(dur) + return nil + default: + return fmt.Errorf("cannot scan duration %v", value) + } +} diff --git a/cmd/ftl-controller/main.go b/cmd/ftl-controller/main.go index 53ca9bc53..1b1ca03f3 100644 --- a/cmd/ftl-controller/main.go +++ b/cmd/ftl-controller/main.go @@ -2,6 +2,7 @@ package main import ( "context" + "database/sql" "fmt" "os" "strconv" @@ -10,7 +11,6 @@ import ( "github.com/alecthomas/kong" "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/service/secretsmanager" - "github.com/jackc/pgx/v5/pgxpool" "github.com/TBD54566975/ftl" "github.com/TBD54566975/ftl/backend/controller" @@ -51,7 +51,7 @@ func main() { kctx.FatalIfErrorf(err, "failed to initialize observability") // The FTL controller currently only supports DB as a configuration provider/resolver. - conn, err := pgxpool.New(ctx, cli.ControllerConfig.DSN) + conn, err := sql.Open("pgx", cli.ControllerConfig.DSN) kctx.FatalIfErrorf(err) dal, err := dal.New(ctx, conn, encryptors) kctx.FatalIfErrorf(err) diff --git a/cmd/ftl/cmd_box_run.go b/cmd/ftl/cmd_box_run.go index 884f114e2..fee285d3d 100644 --- a/cmd/ftl/cmd_box_run.go +++ b/cmd/ftl/cmd_box_run.go @@ -2,11 +2,12 @@ package main import ( "context" + "database/sql" "fmt" "net/url" "time" - "github.com/jackc/pgx/v5/pgxpool" + _ "github.com/jackc/pgx/v5/stdlib" // pgx driver "github.com/jpillora/backoff" "golang.org/x/sync/errgroup" @@ -57,7 +58,7 @@ func (b *boxRunCmd) Run(ctx context.Context, projConfig projectconfig.Config) er } // Bring up the DB connection and DAL. - pool, err := pgxpool.New(ctx, config.DSN) + conn, err := sql.Open("pgx", config.DSN) if err != nil { return fmt.Errorf("failed to bring up DB connection: %w", err) } @@ -68,7 +69,7 @@ func (b *boxRunCmd) Run(ctx context.Context, projConfig projectconfig.Config) er wg := errgroup.Group{} wg.Go(func() error { - return controller.Start(ctx, config, runnerScaling, pool, encryptors) + return controller.Start(ctx, config, runnerScaling, conn, encryptors) }) // Wait for the controller to come up. diff --git a/cmd/ftl/cmd_serve.go b/cmd/ftl/cmd_serve.go index 2a037b3b9..cb21fadc5 100644 --- a/cmd/ftl/cmd_serve.go +++ b/cmd/ftl/cmd_serve.go @@ -2,6 +2,7 @@ package main import ( "context" + "database/sql" "errors" "fmt" "net" @@ -15,7 +16,7 @@ import ( "connectrpc.com/connect" "github.com/alecthomas/types/optional" - "github.com/jackc/pgx/v5/pgxpool" + _ "github.com/jackc/pgx/v5/stdlib" // pgx driver "golang.org/x/sync/errgroup" "github.com/TBD54566975/ftl" @@ -147,7 +148,7 @@ func (s *serveCmd) run(ctx context.Context, projConfig projectconfig.Config, ini controllerCtx = cf.ContextWithSecrets(controllerCtx, sm) // Bring up the DB connection and DAL. - pool, err := pgxpool.New(ctx, config.DSN) + conn, err := sql.Open("pgx", config.DSN) if err != nil { return fmt.Errorf("failed to bring up DB connection: %w", err) } @@ -157,7 +158,7 @@ func (s *serveCmd) run(ctx context.Context, projConfig projectconfig.Config, ini } wg.Go(func() error { - if err := controller.Start(controllerCtx, config, runnerScaling, pool, encryptors); err != nil { + if err := controller.Start(controllerCtx, config, runnerScaling, conn, encryptors); err != nil { logger.Errorf(err, "controller%d failed: %v", i, err) return fmt.Errorf("controller%d failed: %w", i, err) } diff --git a/common/configuration/dal/dal.go b/common/configuration/dal/dal.go index b8e106e6b..54a2db308 100644 --- a/common/configuration/dal/dal.go +++ b/common/configuration/dal/dal.go @@ -6,7 +6,6 @@ import ( "fmt" "github.com/alecthomas/types/optional" - "github.com/jackc/pgx/v5/pgxpool" dalerrs "github.com/TBD54566975/ftl/backend/dal" "github.com/TBD54566975/ftl/common/configuration/sql" @@ -16,8 +15,8 @@ type DAL struct { db sql.DBI } -func New(ctx context.Context, pool *pgxpool.Pool) (*DAL, error) { - dal := &DAL{db: sql.NewDB(pool)} +func New(ctx context.Context, conn sql.ConnI) (*DAL, error) { + dal := &DAL{db: sql.NewDB(conn)} return dal, nil } diff --git a/common/configuration/sql/db.go b/common/configuration/sql/db.go index c4b45fb31..8a1e45d05 100644 --- a/common/configuration/sql/db.go +++ b/common/configuration/sql/db.go @@ -6,15 +6,14 @@ package sql import ( "context" - - "github.com/jackc/pgx/v5" - "github.com/jackc/pgx/v5/pgconn" + "database/sql" ) type DBTX interface { - Exec(context.Context, string, ...interface{}) (pgconn.CommandTag, error) - Query(context.Context, string, ...interface{}) (pgx.Rows, error) - QueryRow(context.Context, string, ...interface{}) pgx.Row + ExecContext(context.Context, string, ...interface{}) (sql.Result, error) + PrepareContext(context.Context, string) (*sql.Stmt, error) + QueryContext(context.Context, string, ...interface{}) (*sql.Rows, error) + QueryRowContext(context.Context, string, ...interface{}) *sql.Row } func New(db DBTX) *Queries { @@ -25,7 +24,7 @@ type Queries struct { db DBTX } -func (q *Queries) WithTx(tx pgx.Tx) *Queries { +func (q *Queries) WithTx(tx *sql.Tx) *Queries { return &Queries{ db: tx, } diff --git a/common/configuration/sql/models.go b/common/configuration/sql/models.go index e67f71b01..c5bddc7eb 100644 --- a/common/configuration/sql/models.go +++ b/common/configuration/sql/models.go @@ -11,10 +11,12 @@ import ( "time" "github.com/TBD54566975/ftl/backend/controller/leases" + "github.com/TBD54566975/ftl/backend/controller/sql/sqltypes" "github.com/TBD54566975/ftl/backend/schema" "github.com/TBD54566975/ftl/internal/model" "github.com/alecthomas/types/optional" "github.com/google/uuid" + "github.com/sqlc-dev/pqtype" ) type AsyncCallState string @@ -376,16 +378,16 @@ type AsyncCall struct { State AsyncCallState Origin string ScheduledAt time.Time - Request []byte - Response []byte + Request json.RawMessage + Response pqtype.NullRawMessage Error optional.Option[string] RemainingAttempts int32 - Backoff time.Duration - MaxBackoff time.Duration + Backoff sqltypes.Duration + MaxBackoff sqltypes.Duration CatchVerb optional.Option[schema.RefKey] Catching bool ParentRequestKey optional.Option[string] - TraceContext []byte + TraceContext pqtype.NullRawMessage } type Controller struct { @@ -415,7 +417,7 @@ type Deployment struct { ModuleID int64 Key model.DeploymentKey Schema *schema.Module - Labels []byte + Labels json.RawMessage MinReplicas int32 } @@ -467,7 +469,7 @@ type Lease struct { Key leases.Key CreatedAt time.Time ExpiresAt time.Time - Metadata []byte + Metadata pqtype.NullRawMessage } type Module struct { @@ -481,7 +483,7 @@ type ModuleConfiguration struct { CreatedAt time.Time Module optional.Option[string] Name string - Value []byte + Value json.RawMessage } type ModuleSecret struct { @@ -509,7 +511,7 @@ type Runner struct { Endpoint string ModuleName optional.Option[string] DeploymentID optional.Option[int64] - Labels []byte + Labels json.RawMessage } type Topic struct { @@ -530,7 +532,7 @@ type TopicEvent struct { Payload []byte Caller optional.Option[string] RequestKey optional.Option[string] - TraceContext []byte + TraceContext pqtype.NullRawMessage } type TopicSubscriber struct { @@ -541,8 +543,8 @@ type TopicSubscriber struct { DeploymentID int64 Sink schema.RefKey RetryAttempts int32 - Backoff time.Duration - MaxBackoff time.Duration + Backoff sqltypes.Duration + MaxBackoff sqltypes.Duration CatchVerb optional.Option[schema.RefKey] } diff --git a/common/configuration/sql/querier.go b/common/configuration/sql/querier.go index adfbe2f0b..b3e7b2224 100644 --- a/common/configuration/sql/querier.go +++ b/common/configuration/sql/querier.go @@ -6,16 +6,17 @@ package sql import ( "context" + "encoding/json" "github.com/alecthomas/types/optional" ) type Querier interface { - GetModuleConfiguration(ctx context.Context, module optional.Option[string], name string) ([]byte, error) + GetModuleConfiguration(ctx context.Context, module optional.Option[string], name string) (json.RawMessage, error) GetModuleSecretURL(ctx context.Context, module optional.Option[string], name string) (string, error) ListModuleConfiguration(ctx context.Context) ([]ModuleConfiguration, error) ListModuleSecrets(ctx context.Context) ([]ModuleSecret, error) - SetModuleConfiguration(ctx context.Context, module optional.Option[string], name string, value []byte) error + SetModuleConfiguration(ctx context.Context, module optional.Option[string], name string, value json.RawMessage) error SetModuleSecretURL(ctx context.Context, module optional.Option[string], name string, url string) error UnsetModuleConfiguration(ctx context.Context, module optional.Option[string], name string) error UnsetModuleSecret(ctx context.Context, module optional.Option[string], name string) error diff --git a/common/configuration/sql/queries.sql.go b/common/configuration/sql/queries.sql.go index f611ce598..b20dc48fc 100644 --- a/common/configuration/sql/queries.sql.go +++ b/common/configuration/sql/queries.sql.go @@ -7,6 +7,7 @@ package sql import ( "context" + "encoding/json" "github.com/alecthomas/types/optional" ) @@ -21,9 +22,9 @@ ORDER BY module NULLS LAST LIMIT 1 ` -func (q *Queries) GetModuleConfiguration(ctx context.Context, module optional.Option[string], name string) ([]byte, error) { - row := q.db.QueryRow(ctx, getModuleConfiguration, module, name) - var value []byte +func (q *Queries) GetModuleConfiguration(ctx context.Context, module optional.Option[string], name string) (json.RawMessage, error) { + row := q.db.QueryRowContext(ctx, getModuleConfiguration, module, name) + var value json.RawMessage err := row.Scan(&value) return value, err } @@ -39,7 +40,7 @@ LIMIT 1 ` func (q *Queries) GetModuleSecretURL(ctx context.Context, module optional.Option[string], name string) (string, error) { - row := q.db.QueryRow(ctx, getModuleSecretURL, module, name) + row := q.db.QueryRowContext(ctx, getModuleSecretURL, module, name) var url string err := row.Scan(&url) return url, err @@ -52,7 +53,7 @@ ORDER BY module, name ` func (q *Queries) ListModuleConfiguration(ctx context.Context) ([]ModuleConfiguration, error) { - rows, err := q.db.Query(ctx, listModuleConfiguration) + rows, err := q.db.QueryContext(ctx, listModuleConfiguration) if err != nil { return nil, err } @@ -71,6 +72,9 @@ func (q *Queries) ListModuleConfiguration(ctx context.Context) ([]ModuleConfigur } items = append(items, i) } + if err := rows.Close(); err != nil { + return nil, err + } if err := rows.Err(); err != nil { return nil, err } @@ -84,7 +88,7 @@ ORDER BY module, name ` func (q *Queries) ListModuleSecrets(ctx context.Context) ([]ModuleSecret, error) { - rows, err := q.db.Query(ctx, listModuleSecrets) + rows, err := q.db.QueryContext(ctx, listModuleSecrets) if err != nil { return nil, err } @@ -103,6 +107,9 @@ func (q *Queries) ListModuleSecrets(ctx context.Context) ([]ModuleSecret, error) } items = append(items, i) } + if err := rows.Close(); err != nil { + return nil, err + } if err := rows.Err(); err != nil { return nil, err } @@ -115,8 +122,8 @@ VALUES ($1, $2, $3) ON CONFLICT ((COALESCE(module, '')), name) DO UPDATE SET value = $3 ` -func (q *Queries) SetModuleConfiguration(ctx context.Context, module optional.Option[string], name string, value []byte) error { - _, err := q.db.Exec(ctx, setModuleConfiguration, module, name, value) +func (q *Queries) SetModuleConfiguration(ctx context.Context, module optional.Option[string], name string, value json.RawMessage) error { + _, err := q.db.ExecContext(ctx, setModuleConfiguration, module, name, value) return err } @@ -127,7 +134,7 @@ ON CONFLICT ((COALESCE(module, '')), name) DO UPDATE SET url = $3 ` func (q *Queries) SetModuleSecretURL(ctx context.Context, module optional.Option[string], name string, url string) error { - _, err := q.db.Exec(ctx, setModuleSecretURL, module, name, url) + _, err := q.db.ExecContext(ctx, setModuleSecretURL, module, name, url) return err } @@ -137,7 +144,7 @@ WHERE COALESCE(module, '') = COALESCE($1, '') AND name = $2 ` func (q *Queries) UnsetModuleConfiguration(ctx context.Context, module optional.Option[string], name string) error { - _, err := q.db.Exec(ctx, unsetModuleConfiguration, module, name) + _, err := q.db.ExecContext(ctx, unsetModuleConfiguration, module, name) return err } @@ -147,6 +154,6 @@ WHERE COALESCE(module, '') = COALESCE($1, '') AND name = $2 ` func (q *Queries) UnsetModuleSecret(ctx context.Context, module optional.Option[string], name string) error { - _, err := q.db.Exec(ctx, unsetModuleSecret, module, name) + _, err := q.db.ExecContext(ctx, unsetModuleSecret, module, name) return err } diff --git a/go.mod b/go.mod index 494f767c5..d319ae82f 100644 --- a/go.mod +++ b/go.mod @@ -42,6 +42,7 @@ require ( github.com/radovskyb/watcher v1.0.7 github.com/rs/cors v1.11.0 github.com/santhosh-tekuri/jsonschema/v5 v5.3.1 + github.com/sqlc-dev/pqtype v0.3.0 github.com/swaggest/jsonschema-go v0.3.72 github.com/tink-crypto/tink-go/v2 v2.2.0 github.com/titanous/json5 v1.0.0 @@ -127,7 +128,7 @@ require ( github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect github.com/jackc/puddle/v2 v2.2.1 // indirect - github.com/lib/pq v1.10.9 // indirect + github.com/lib/pq v1.10.9 github.com/pelletier/go-toml v1.9.5 // indirect github.com/puzpuzpuz/xsync/v3 v3.4.0 github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect diff --git a/go.sum b/go.sum index 187085ef5..7a347a6f2 100644 --- a/go.sum +++ b/go.sum @@ -227,6 +227,8 @@ github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= github.com/sourcegraph/jsonrpc2 v0.2.0 h1:KjN/dC4fP6aN9030MZCJs9WQbTOjWHhrtKVpzzSrr/U= github.com/sourcegraph/jsonrpc2 v0.2.0/go.mod h1:ZafdZgk/axhT1cvZAPOhw+95nz2I/Ra5qMlU4gTRwIo= +github.com/sqlc-dev/pqtype v0.3.0 h1:b09TewZ3cSnO5+M1Kqq05y0+OjqIptxELaSayg7bmqk= +github.com/sqlc-dev/pqtype v0.3.0/go.mod h1:oyUjp5981ctiL9UYvj1bVvCKi8OXkCa0u645hce7CAs= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= diff --git a/sqlc.yaml b/sqlc.yaml index e234043ab..d1794ddf4 100644 --- a/sqlc.yaml +++ b/sqlc.yaml @@ -12,7 +12,6 @@ sql: gen: go: &gengo package: "sql" - sql_package: "pgx/v5" out: "backend/controller/sql" emit_interface: true query_parameter_limit: 3 @@ -26,11 +25,11 @@ sql: - db_type: "timestamptz" go_type: "time.Time" - db_type: "pg_catalog.interval" - go_type: "time.Duration" + go_type: "github.com/TBD54566975/ftl/backend/controller/sql/sqltypes.Duration" - db_type: "pg_catalog.interval" nullable: true go_type: - type: "optional.Option[time.Duration]" + type: "optional.Option[sqltypes.Duration]" - db_type: "module_schema_pb" go_type: "*github.com/TBD54566975/ftl/backend/schema.Module" - db_type: "timestamptz"