Skip to content

Commit

Permalink
refactor: switch from pgx5 to stdlib sql driver (#2299)
Browse files Browse the repository at this point in the history
This will allow us to trivially add OTEL instrumentation.
  • Loading branch information
alecthomas authored Aug 8, 2024
1 parent 9679213 commit bdc2dae
Show file tree
Hide file tree
Showing 33 changed files with 446 additions and 305 deletions.
16 changes: 8 additions & 8 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
sha "crypto/sha256"
"database/sql"
"encoding/binary"
"encoding/json"
"errors"
Expand All @@ -25,7 +26,6 @@ import (
"github.com/alecthomas/types/either"
"github.com/alecthomas/types/optional"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/jellydator/ttlcache/v3"
"github.com/jpillora/backoff"
"golang.org/x/exp/maps"
Expand Down Expand Up @@ -147,7 +147,7 @@ func (c *Config) SetDefaults() {
}

// Start the Controller. Blocks until the context is cancelled.
func Start(ctx context.Context, config Config, runnerScaling scaling.RunnerScaling, pool *pgxpool.Pool, encryptors *dal.Encryptors) error {
func Start(ctx context.Context, config Config, runnerScaling scaling.RunnerScaling, conn *sql.DB, encryptors *dal.Encryptors) error {
config.SetDefaults()

logger := log.FromContext(ctx)
Expand All @@ -168,7 +168,7 @@ func Start(ctx context.Context, config Config, runnerScaling scaling.RunnerScali
logger.Infof("Web console available at: %s", config.Bind)
}

svc, err := New(ctx, pool, config, runnerScaling, encryptors)
svc, err := New(ctx, conn, config, runnerScaling, encryptors)
if err != nil {
return err
}
Expand Down Expand Up @@ -226,7 +226,7 @@ type ControllerListListener interface {
}

type Service struct {
pool *pgxpool.Pool
conn *sql.DB
dal *dal.DAL
key model.ControllerKey
deploymentLogsSink *deploymentLogsSink
Expand All @@ -250,7 +250,7 @@ type Service struct {
asyncCallsLock sync.Mutex
}

func New(ctx context.Context, pool *pgxpool.Pool, config Config, runnerScaling scaling.RunnerScaling, encryptors *dal.Encryptors) (*Service, error) {
func New(ctx context.Context, conn *sql.DB, config Config, runnerScaling scaling.RunnerScaling, encryptors *dal.Encryptors) (*Service, error) {
key := config.Key
if config.Key.IsZero() {
key = model.NewControllerKey(config.Bind.Hostname(), config.Bind.Port())
Expand All @@ -264,15 +264,15 @@ func New(ctx context.Context, pool *pgxpool.Pool, config Config, runnerScaling s
config.ControllerTimeout = time.Second * 5
}

db, err := dal.New(ctx, pool, encryptors)
db, err := dal.New(ctx, conn, encryptors)
if err != nil {
return nil, fmt.Errorf("failed to create DAL: %w", err)
}

svc := &Service{
tasks: scheduledtask.New(ctx, key, db),
dal: db,
pool: pool,
conn: conn,
key: key,
deploymentLogsSink: newDeploymentLogsSink(ctx, db),
clients: ttlcache.New(ttlcache.WithTTL[string, clients](time.Minute)),
Expand All @@ -283,7 +283,7 @@ func New(ctx context.Context, pool *pgxpool.Pool, config Config, runnerScaling s
svc.routes.Store(map[string][]dal.Route{})
svc.schema.Store(&schema.Schema{})

cronSvc := cronjobs.New(ctx, key, svc.config.Advertise.Host, cronjobs.Config{Timeout: config.CronJobTimeout}, pool, svc.tasks, svc.callWithRequest)
cronSvc := cronjobs.New(ctx, key, svc.config.Advertise.Host, cronjobs.Config{Timeout: config.CronJobTimeout}, conn, svc.tasks, svc.callWithRequest)
svc.cronJobs = cronSvc
svc.controllerListListeners = append(svc.controllerListListeners, cronSvc)

Expand Down
6 changes: 3 additions & 3 deletions backend/controller/cronjobs/cronjobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cronjobs

import (
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
Expand All @@ -12,7 +13,6 @@ import (
"github.com/alecthomas/types/optional"
"github.com/alecthomas/types/pubsub"
"github.com/benbjohnson/clock"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/jpillora/backoff"
"github.com/serialx/hashring"

Expand Down Expand Up @@ -97,8 +97,8 @@ type Service struct {
hashRingState atomic.Value[*hashRingState]
}

func New(ctx context.Context, key model.ControllerKey, requestSource string, config Config, pool *pgxpool.Pool, scheduler Scheduler, call ExecuteCallFunc) *Service {
return NewForTesting(ctx, key, requestSource, config, dal.New(pool), scheduler, call, clock.New())
func New(ctx context.Context, key model.ControllerKey, requestSource string, config Config, conn *sql.DB, scheduler Scheduler, call ExecuteCallFunc) *Service {
return NewForTesting(ctx, key, requestSource, config, dal.New(conn), scheduler, call, clock.New())
}

func NewForTesting(ctx context.Context, key model.ControllerKey, requestSource string, config Config, dal DAL, scheduler Scheduler, call ExecuteCallFunc, clock clock.Clock) *Service {
Expand Down
9 changes: 4 additions & 5 deletions backend/controller/cronjobs/dal/dal.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,8 @@ import (
"fmt"
"time"

"github.com/jackc/pgx/v5/pgxpool"

"github.com/TBD54566975/ftl/backend/controller/cronjobs/sql"
"github.com/TBD54566975/ftl/backend/controller/sql/sqltypes"
dalerrs "github.com/TBD54566975/ftl/backend/dal"
"github.com/TBD54566975/ftl/backend/schema"
"github.com/TBD54566975/ftl/internal/model"
Expand All @@ -19,8 +18,8 @@ type DAL struct {
db sql.DBI
}

func New(pool *pgxpool.Pool) *DAL {
return &DAL{db: sql.NewDB(pool)}
func New(conn sql.ConnI) *DAL {
return &DAL{db: sql.NewDB(conn)}
}

func cronJobFromRow(row sql.GetCronJobsRow) model.CronJob {
Expand Down Expand Up @@ -92,7 +91,7 @@ func (d *DAL) EndCronJob(ctx context.Context, job model.CronJob, next time.Time)

// GetStaleCronJobs returns a list of cron jobs that have been executing longer than the duration
func (d *DAL) GetStaleCronJobs(ctx context.Context, duration time.Duration) ([]model.CronJob, error) {
rows, err := d.db.GetStaleCronJobs(ctx, duration)
rows, err := d.db.GetStaleCronJobs(ctx, sqltypes.Duration(duration))
if err != nil {
return nil, fmt.Errorf("failed to get stale cron jobs: %w", dalerrs.TranslatePGError(err))
}
Expand Down
13 changes: 6 additions & 7 deletions backend/controller/cronjobs/sql/db.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 14 additions & 12 deletions backend/controller/cronjobs/sql/models.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion backend/controller/cronjobs/sql/querier.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 17 additions & 6 deletions backend/controller/cronjobs/sql/queries.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit bdc2dae

Please sign in to comment.