Skip to content

Commit

Permalink
Add tx support to querier
Browse files Browse the repository at this point in the history
  • Loading branch information
eminano committed Sep 13, 2024
1 parent e51af1e commit d9c93bc
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 14 deletions.
5 changes: 5 additions & 0 deletions internal/postgres/mocks/mock_pg_querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ type Querier struct {
QueryRowFn func(ctx context.Context, query string, args ...any) postgres.Row
QueryFn func(ctx context.Context, query string, args ...any) (postgres.Rows, error)
ExecFn func(context.Context, string, ...any) (postgres.CommandTag, error)
ExecInTxFn func(context.Context, func(tx postgres.Tx) error) error
CloseFn func(context.Context) error
}

Expand All @@ -27,6 +28,10 @@ func (m *Querier) Exec(ctx context.Context, query string, args ...any) (postgres
return m.ExecFn(ctx, query, args...)
}

func (m *Querier) ExecInTx(ctx context.Context, fn func(tx postgres.Tx) error) error {
return m.ExecInTxFn(ctx, fn)
}

func (m *Querier) Close(ctx context.Context) error {
return m.CloseFn(ctx)
}
27 changes: 14 additions & 13 deletions internal/postgres/pg_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,12 @@ import (
"fmt"

"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
)

type Conn struct {
conn *pgx.Conn
}

type Row interface {
pgx.Row
}

type Rows interface {
pgx.Rows
}

type CommandTag struct {
pgconn.CommandTag
}

func NewConn(ctx context.Context, url string) (*Conn, error) {
pgCfg, err := pgx.ParseConfig(url)
if err != nil {
Expand Down Expand Up @@ -55,6 +42,20 @@ func (c *Conn) Exec(ctx context.Context, query string, args ...any) (CommandTag,
return CommandTag{tag}, mapError(err)
}

func (c *Conn) ExecInTx(ctx context.Context, fn func(Tx) error) error {
tx, err := c.conn.BeginTx(ctx, pgx.TxOptions{})
if err != nil {
return mapError(err)
}

if err := fn(tx); err != nil {
tx.Rollback(ctx)
return mapError(err)
}

return tx.Commit(ctx)
}

func (c *Conn) Close(ctx context.Context) error {
return mapError(c.conn.Close(ctx))
}
15 changes: 15 additions & 0 deletions internal/postgres/pg_conn_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"context"
"fmt"

"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
)

Expand Down Expand Up @@ -42,6 +43,20 @@ func (c *Pool) Exec(ctx context.Context, query string, args ...any) (CommandTag,
return CommandTag{tag}, mapError(err)
}

func (c *Pool) ExecInTx(ctx context.Context, fn func(Tx) error) error {
tx, err := c.Pool.BeginTx(ctx, pgx.TxOptions{})
if err != nil {
return mapError(err)
}

if err := fn(tx); err != nil {
tx.Rollback(ctx)
return mapError(err)
}

return tx.Commit(ctx)
}

func (c *Pool) Close(_ context.Context) error {
c.Pool.Close()
return nil
Expand Down
25 changes: 24 additions & 1 deletion internal/postgres/pg_querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,37 @@

package postgres

import "context"
import (
"context"

"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
)

type Querier interface {
Query(ctx context.Context, query string, args ...any) (Rows, error)
QueryRow(ctx context.Context, query string, args ...any) Row
Exec(ctx context.Context, query string, args ...any) (CommandTag, error)
ExecInTx(ctx context.Context, fn func(tx Tx) error) error
Close(ctx context.Context) error
}

type Row interface {
pgx.Row
}

type Rows interface {
pgx.Rows
}

type Tx interface {
pgx.Tx
}

type CommandTag struct {
pgconn.CommandTag
}

type mappedRow struct {
inner Row
}
Expand Down

0 comments on commit d9c93bc

Please sign in to comment.