Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add SQLite adapter #8

Merged
merged 7 commits into from
Dec 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 60 additions & 18 deletions example/storage/eventstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ import (
"context"
"database/sql"
"fmt"
"os"
"testing"

"github.com/google/uuid"
_ "github.com/lib/pq"
_ "github.com/mattn/go-sqlite3"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand All @@ -30,18 +32,9 @@ var (
}
)

const (
host = "localhost"
port = 5432
user = "postgres"
password = "password"
dbname = "postgres"
)

func TestPostcard_Repositories(t *testing.T) {
conn := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=disable", host, port, user, password, dbname)
db, err := sql.Open("postgres", conn)
require.NoError(t, err)
postgresDB := testPostgresDB(t)
sqliteDB := testSQLiteDB(t)

testCases := []struct {
name string
Expand All @@ -54,47 +47,63 @@ func TestPostcard_Repositories(t *testing.T) {
{
name: "postgres_simple",
repository: func() eventstore.EventStore[*postcard.Postcard] {
repo, err := storage.NewDefaultSimplePostcardRepository(context.Background(), db)
repo, err := storage.NewDefaultSimplePostcardRepository(context.Background(), postgresDB)
require.NoError(t, err)
return repo
}(),
},
{
name: "postgres_simple_custom",
repository: func() eventstore.EventStore[*postcard.Postcard] {
repo, err := storage.NewCustomSimplePostcardRepository(context.Background(), db)
repo, err := storage.NewCustomSimplePostcardRepository(context.Background(), postgresDB)
require.NoError(t, err)
return repo
}(),
},
{
name: "postgres_simple_anonymized",
repository: func() eventstore.EventStore[*postcard.Postcard] {
repo, err := storage.NewSimpleAnonymizingPostcardRepository(context.Background(), db)
repo, err := storage.NewSimpleAnonymizingPostcardRepository(context.Background(), postgresDB)
require.NoError(t, err)
return repo
}(),
},
{
name: "postgres_mapping",
repository: func() eventstore.EventStore[*postcard.Postcard] {
repo, err := storage.NewDefaultMappingPostgresRepository(context.Background(), db)
repo, err := storage.NewDefaultMappingPostgresRepository(context.Background(), postgresDB)
require.NoError(t, err)
return repo
}(),
},
{
name: "postgres_mapping",
name: "postgres_mapping_custom",
repository: func() eventstore.EventStore[*postcard.Postcard] {
repo, err := storage.NewCustomMappingPostcardRepository(context.Background(), db)
repo, err := storage.NewCustomMappingPostcardRepository(context.Background(), postgresDB)
maclav3 marked this conversation as resolved.
Show resolved Hide resolved
require.NoError(t, err)
return repo
}(),
},
{
name: "postgres_mapping_anonymized",
repository: func() eventstore.EventStore[*postcard.Postcard] {
repo, err := storage.NewMappingAnonymizingPostcardRepository(context.Background(), db)
repo, err := storage.NewMappingAnonymizingPostcardRepository(context.Background(), postgresDB)
require.NoError(t, err)
return repo
}(),
},
{
name: "sqlite_simple",
repository: func() eventstore.EventStore[*postcard.Postcard] {
repo, err := storage.NewSimpleSQLitePostcardRepository(context.Background(), sqliteDB)
require.NoError(t, err)
return repo
}(),
},
{
name: "sqlite_mapping",
repository: func() eventstore.EventStore[*postcard.Postcard] {
repo, err := storage.NewMappingSQLitePostcardRepository(context.Background(), sqliteDB)
require.NoError(t, err)
return repo
}(),
Expand Down Expand Up @@ -160,3 +169,36 @@ func TestPostcard_Repositories(t *testing.T) {
})
}
}

const (
host = "localhost"
port = 5432
user = "postgres"
password = "password"
dbname = "postgres"
)

func testPostgresDB(t *testing.T) *sql.DB {
conn := fmt.Sprintf(
"host=%s port=%d user=%s password=%s dbname=%s sslmode=disable",
host,
port,
user,
password,
dbname,
)
postgresDB, err := sql.Open("postgres", conn)
require.NoError(t, err)

return postgresDB
}

func testSQLiteDB(t *testing.T) *sql.DB {
dbFile, err := os.CreateTemp("", "tmp_*.db")
require.NoError(t, err)

sqliteDB, err := sql.Open("sqlite3", dbFile.Name())
require.NoError(t, err)

return sqliteDB
}
15 changes: 15 additions & 0 deletions example/storage/mapping_postgres_eventstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,21 @@ func NewMappingAnonymizingPostcardRepository(ctx context.Context, db *sql.DB) (e
)
}

func NewMappingSQLitePostcardRepository(ctx context.Context, db *sql.DB) (eventstore.EventStore[*postcard.Postcard], error) {
return eventstore.NewSQLStore[*postcard.Postcard](
ctx,
db,
eventstore.NewMappingSQLiteConfig[*postcard.Postcard](
[]transport.EventMapper[*postcard.Postcard]{
CreatedMapper{},
AddressedMapper{},
WrittenMapper{},
SentMapper{},
},
),
)
}

type Created struct {
ID string `json:"id"`
}
Expand Down
15 changes: 15 additions & 0 deletions example/storage/simple_postgres_eventstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,21 @@ func NewSimpleAnonymizingPostcardRepository(ctx context.Context, db *sql.DB) (ev
)
}

func NewSimpleSQLitePostcardRepository(ctx context.Context, db *sql.DB) (eventstore.EventStore[*postcard.Postcard], error) {
return eventstore.NewSQLStore[*postcard.Postcard](
ctx,
db,
eventstore.NewSQLiteConfig[*postcard.Postcard](
[]aggregate.Event[*postcard.Postcard]{
postcard.Created{},
postcard.Addressed{},
postcard.Written{},
postcard.Sent{},
},
),
)
}

type ConstantSecretProvider struct{}

func (c ConstantSecretProvider) SecretForKey(aggregateID aggregate.ID) ([]byte, error) {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ go 1.18
require (
github.com/google/uuid v1.3.0
github.com/lib/pq v1.10.6
github.com/mattn/go-sqlite3 v1.14.16
github.com/stretchr/testify v1.7.0
)

require (
github.com/davecgh/go-spew v1.1.0 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect
)
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/lib/pq v1.10.6 h1:jbk+ZieJ0D7EVGJYpL9QTz7/YW6UHbmdnZWYyK5cdBs=
github.com/lib/pq v1.10.6/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/mattn/go-sqlite3 v1.14.16 h1:yOQRA0RpS5PFz/oikGwBEqvAWhWg5ufRz4ETLjwpU1Y=
github.com/mattn/go-sqlite3 v1.14.16/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
Expand Down
18 changes: 18 additions & 0 deletions pkg/eventstore/sql_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,21 @@ func NewMappingPostgresSQLConfig[T any](
Serializer: transport.NewMappingSerializer[T](transport.JSONMarshaler{}, eventMappers),
}
}

func NewSQLiteConfig[T any](
supportedEvents []aggregate.Event[T],
) SQLConfig[T] {
return SQLConfig[T]{
SchemaAdapter: NewSQLiteSchemaAdapter[T](""),
Serializer: transport.NewSimpleSerializer(transport.JSONMarshaler{}, supportedEvents),
}
}

func NewMappingSQLiteConfig[T any](
eventMappers []transport.EventMapper[T],
) SQLConfig[T] {
return SQLConfig[T]{
SchemaAdapter: NewSQLiteSchemaAdapter[T](""),
Serializer: transport.NewMappingSerializer[T](transport.JSONMarshaler{}, eventMappers),
}
}
52 changes: 52 additions & 0 deletions pkg/eventstore/sql_schema_adapter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package eventstore
krzysztofreczek marked this conversation as resolved.
Show resolved Hide resolved

import (
"fmt"
"strings"
)

const (
defaultEventsTableName = "events"
defaultSelectQuery = `
SELECT
aggregate_id,
aggregate_version,
event_name,
event_payload
FROM %s
WHERE aggregate_id = $1 AND aggregate_type = $2
ORDER BY aggregate_version ASC;
`
defaultInsertQuery = `
INSERT INTO %s (
aggregate_id,
aggregate_version,
aggregate_type,
event_name,
event_payload
)
VALUES %s
`
defaultInsertMarkersCount = 5
defaultInsertMarkersPattern = "($%d,$%d,$%d,$%d,$%d),"
)

func defaultInsertMarkers(count int) string {
result := strings.Builder{}

var indices []any
for i := 1; i <= count*defaultInsertMarkersCount; i++ {
indices = append(indices, i)
if i%defaultInsertMarkersCount == 0 {
result.WriteString(
fmt.Sprintf(
defaultInsertMarkersPattern,
indices...,
),
)
indices = nil
}
}

return strings.TrimRight(result.String(), ",")
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,9 @@ package eventstore

import (
"fmt"
"strings"
)

const eventsTableName = "events"

type PostgresSchemaAdapter[A any] struct {
aggregateType string
}

func NewPostgresSchemaAdapter[A any](aggregateType string) PostgresSchemaAdapter[A] {
return PostgresSchemaAdapter[A]{
aggregateType: aggregateType,
}
}

func (a PostgresSchemaAdapter[A]) InitializeSchemaQuery() string {
query := `
const postgresInitializeSchemaQuery = `
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
CREATE TABLE IF NOT EXISTS %[1]s (
id serial NOT NULL PRIMARY KEY,
Expand All @@ -32,19 +18,23 @@ CREATE TABLE IF NOT EXISTS %[1]s (
CREATE INDEX IF NOT EXISTS idx_aggregate_id ON %[1]s (aggregate_id);
CREATE UNIQUE INDEX IF NOT EXISTS idx_aggregate_id_version ON %[1]s (aggregate_id, aggregate_version);
`
return fmt.Sprintf(query, eventsTableName)

type PostgresSchemaAdapter[A any] struct {
aggregateType string
}

func (a PostgresSchemaAdapter[A]) SelectQuery(aggregateID string) (string, []any, error) {
query := `
SELECT
aggregate_id, aggregate_version, event_name, event_payload
FROM "%s"
WHERE aggregate_id = $1 AND aggregate_type = $2
ORDER BY aggregate_version ASC;
`
func NewPostgresSchemaAdapter[A any](aggregateType string) PostgresSchemaAdapter[A] {
return PostgresSchemaAdapter[A]{
aggregateType: aggregateType,
}
}

func (a PostgresSchemaAdapter[A]) InitializeSchemaQuery() string {
return fmt.Sprintf(postgresInitializeSchemaQuery, defaultEventsTableName)
}

query = fmt.Sprintf(query, eventsTableName)
func (a PostgresSchemaAdapter[A]) SelectQuery(aggregateID string) (string, []any, error) {
query := fmt.Sprintf(defaultSelectQuery, defaultEventsTableName)

args := []any{
aggregateID, a.aggregateType,
Expand All @@ -54,14 +44,9 @@ ORDER BY aggregate_version ASC;
}

func (a PostgresSchemaAdapter[A]) InsertQuery(events []storageEvent[A]) (string, []any, error) {
query := `
INSERT INTO %s (aggregate_id, aggregate_version, aggregate_type, event_name, event_payload)
VALUES %s`

query = fmt.Sprintf(query, eventsTableName, defaultInsertMarkers(len(events)))
query := fmt.Sprintf(defaultInsertQuery, defaultEventsTableName, defaultInsertMarkers(len(events)))

var args []any

for _, e := range events {
args = append(
args,
Expand All @@ -76,15 +61,3 @@ VALUES %s`
return query, args, nil

}

func defaultInsertMarkers(count int) string {
result := strings.Builder{}

index := 1
for i := 0; i < count; i++ {
result.WriteString(fmt.Sprintf("($%d,$%d,$%d,$%d,$%d),", index, index+1, index+2, index+3, index+4))
index += 5
}

return strings.TrimRight(result.String(), ",")
}
Loading