Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update lib hooks #60

Merged
merged 5 commits into from
Aug 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func parseSearchProcessorConfig() *stream.SearchProcessorConfig {
Store: opensearch.Config{
URL: searchStore,
},
Retrier: &search.StoreRetryConfig{
Retrier: search.StoreRetryConfig{
Backoff: parseBackoffConfig("PGSTREAM_SEARCH_STORE"),
},
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ require (
github.com/google/go-cmp v0.6.0
github.com/jackc/pgerrcode v0.0.0-20220416144525-469b46aa5efa
github.com/jackc/pglogrepl v0.0.0-20240307033717-828fbfe908e9
github.com/jackc/pgx/v5 v5.5.5
github.com/jackc/pgx/v5 v5.6.0
github.com/labstack/echo/v4 v4.12.0
github.com/mitchellh/mapstructure v1.5.0
github.com/pterm/pterm v0.12.79
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,8 @@ github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsI
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk=
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
github.com/jackc/pgx/v5 v5.5.5 h1:amBjrZVmksIdNjxGW/IiIMzxMKZFelXbUoPNb+8sjQw=
github.com/jackc/pgx/v5 v5.5.5/go.mod h1:ez9gk+OAat140fv9ErkZDYFWmXLfV+++K0uAOiwgm1A=
github.com/jackc/pgx/v5 v5.6.0 h1:SWJzexBzPL5jb0GEsrPMLIsi/3jOo7RHlzTjcAeDrPY=
github.com/jackc/pgx/v5 v5.6.0/go.mod h1:DNZ/vlrUnhWCoFGxHAG8U2ljioxukquj7utPDgtQdTw=
github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk=
github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8=
Expand Down
2 changes: 0 additions & 2 deletions pkg/schemalog/log_entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,6 @@ func (m *LogEntry) UnmarshalJSON(b []byte) error {
if err := json.Unmarshal([]byte(schemaStr), &m.Schema); err != nil {
return err
}
default:
eminano marked this conversation as resolved.
Show resolved Hide resolved
panic(fmt.Sprintf("unmarshal LogEntry, got unexpected key when unmarshalling: %s", k))
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/stream/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type KafkaProcessorConfig struct {
type SearchProcessorConfig struct {
Indexer search.IndexerConfig
Store opensearch.Config
Retrier *search.StoreRetryConfig
Retrier search.StoreRetryConfig
}

type WebhookProcessorConfig struct {
Expand Down
5 changes: 1 addition & 4 deletions pkg/stream/stream_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,7 @@ func Run(ctx context.Context, logger loglib.Logger, config *Config, meter metric
if err != nil {
return err
}
if config.Processor.Search.Retrier != nil {
logger.Debug("using retry logic with search store...")
searchStore = search.NewStoreRetrier(searchStore, config.Processor.Search.Retrier, search.WithStoreLogger(logger))
}
searchStore = search.NewStoreRetrier(searchStore, config.Processor.Search.Retrier, search.WithStoreLogger(logger))

searchIndexer := search.NewBatchIndexer(ctx,
config.Processor.Search.Indexer,
Expand Down
31 changes: 11 additions & 20 deletions pkg/wal/processor/search/opensearch/opensearch_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,44 +5,35 @@ package opensearch
import (
"encoding/json"
"fmt"
"strings"

"github.com/xataio/pgstream/internal/es"
"github.com/xataio/pgstream/pkg/schemalog"
"github.com/xataio/pgstream/pkg/wal/processor/search"
)

// Adapter converts from/to search types and opensearch types
type Adapter interface {
SchemaNameToIndex(schemaName string) IndexName
IndexToSchemaName(index string) string
type SearchAdapter interface {
SearchDocToBulkItem(docs search.Document) es.BulkItem
BulkItemsToSearchDocErrs(items []es.BulkItem) []search.DocumentError
RecordToLogEntry(rec map[string]any) (*schemalog.LogEntry, error)
}

type adapter struct {
marshaler func(any) ([]byte, error)
unmarshaler func([]byte, any) error
indexNameAdapter IndexNameAdapter
marshaler func(any) ([]byte, error)
unmarshaler func([]byte, any) error
}

func newDefaultAdapter() *adapter {
func newDefaultAdapter(indexNameAdapter IndexNameAdapter) *adapter {
return &adapter{
marshaler: json.Marshal,
unmarshaler: json.Unmarshal,
indexNameAdapter: indexNameAdapter,
marshaler: json.Marshal,
unmarshaler: json.Unmarshal,
}
}

func (a *adapter) SchemaNameToIndex(schemaName string) IndexName {
return newDefaultIndexName(schemaName)
}

func (a *adapter) IndexToSchemaName(index string) string {
return strings.TrimSuffix(index, "-1")
}

func (a *adapter) SearchDocToBulkItem(doc search.Document) es.BulkItem {
indexName := a.SchemaNameToIndex(doc.Schema)
indexName := a.indexNameAdapter.SchemaNameToIndex(doc.Schema)
item := es.BulkItem{
Doc: doc.Data,
}
Expand Down Expand Up @@ -94,13 +85,13 @@ func (a *adapter) bulkItemToSearchDocErr(item es.BulkItem) search.DocumentError
}
switch {
case item.Index != nil:
doc.Document.Schema = a.IndexToSchemaName(item.Index.Index)
doc.Document.Schema = a.indexNameAdapter.IndexToSchemaName(item.Index.Index)
doc.Document.ID = item.Index.ID
if item.Index.Version != nil {
doc.Document.Version = *item.Index.Version
}
case item.Delete != nil:
doc.Document.Schema = a.IndexToSchemaName(item.Delete.Index)
doc.Document.Schema = a.indexNameAdapter.IndexToSchemaName(item.Delete.Index)
doc.Document.ID = item.Delete.ID
if item.Delete.Version != nil {
doc.Document.Version = *item.Delete.Version
Expand Down
32 changes: 26 additions & 6 deletions pkg/wal/processor/search/opensearch/opensearch_index_name.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,14 @@ package opensearch

import (
"fmt"
"strings"
)

type IndexNameAdapter interface {
SchemaNameToIndex(schemaName string) IndexName
IndexToSchemaName(index string) string
}

// IndexName represents an opensearch index name constructed from a schema name.
type IndexName interface {
Name() string
Expand All @@ -14,33 +20,47 @@ type IndexName interface {
SchemaName() string
}

type indexName struct {
type defaultIndexNameAdapter struct{}

func newDefaultIndexNameAdapter() IndexNameAdapter {
return &defaultIndexNameAdapter{}
}

func (i *defaultIndexNameAdapter) SchemaNameToIndex(schemaName string) IndexName {
return newDefaultIndexName(schemaName)
}

func (i *defaultIndexNameAdapter) IndexToSchemaName(index string) string {
return strings.TrimSuffix(index, "-1")
}

type defaultIndexName struct {
schemaName string
version int
}

func newDefaultIndexName(schemaName string) IndexName {
return &indexName{
return &defaultIndexName{
schemaName: schemaName,
version: 1,
}
}

func (i indexName) SchemaName() string {
func (i defaultIndexName) SchemaName() string {
return i.schemaName
}

// NameWithVersion represents the name of the index with the version number. This should
// generally not be needed, in favour of `Name`.
func (i indexName) NameWithVersion() string {
func (i defaultIndexName) NameWithVersion() string {
return fmt.Sprintf("%s-%d", i.schemaName, i.version)
}

// Name returns the name we should use for querying the index.
func (i *indexName) Name() string {
func (i *defaultIndexName) Name() string {
return i.schemaName
}

func (i *indexName) Version() int {
func (i *defaultIndexName) Version() int {
return i.version
}
46 changes: 31 additions & 15 deletions pkg/wal/processor/search/opensearch/opensearch_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@ import (
)

type Store struct {
logger loglib.Logger
client es.SearchClient
mapper search.Mapper
adapter Adapter
marshaler func(any) ([]byte, error)
logger loglib.Logger
client es.SearchClient
mapper search.Mapper
adapter SearchAdapter
indexNameAdapter IndexNameAdapter
marshaler func(any) ([]byte, error)
}

type Config struct {
Expand Down Expand Up @@ -57,12 +58,14 @@ func NewStore(cfg Config, opts ...Option) (*Store, error) {
}

func NewStoreWithClient(client es.SearchClient) *Store {
indexNameAdapter := newDefaultIndexNameAdapter()
return &Store{
logger: loglib.NewNoopLogger(),
client: client,
adapter: newDefaultAdapter(),
mapper: NewPostgresMapper(),
marshaler: json.Marshal,
logger: loglib.NewNoopLogger(),
client: client,
indexNameAdapter: indexNameAdapter,
adapter: newDefaultAdapter(indexNameAdapter),
mapper: NewPostgresMapper(),
marshaler: json.Marshal,
}
}

Expand All @@ -72,6 +75,19 @@ func WithLogger(l loglib.Logger) Option {
}
}

func WithMapper(m search.Mapper) Option {
return func(s *Store) {
s.mapper = m
}
}

func WithIndexNameAdapter(a IndexNameAdapter) Option {
return func(s *Store) {
s.indexNameAdapter = a
s.adapter = newDefaultAdapter(a)
}
}

func (s *Store) GetMapper() search.Mapper {
return s.mapper
}
Expand Down Expand Up @@ -154,7 +170,7 @@ func (s *Store) SendDocuments(ctx context.Context, docs []search.Document) ([]se
}

func (s *Store) DeleteSchema(ctx context.Context, schemaName string) error {
index := s.adapter.SchemaNameToIndex(schemaName)
index := s.indexNameAdapter.SchemaNameToIndex(schemaName)
exists, err := s.client.IndexExists(ctx, index.NameWithVersion())
if err != nil {
return mapError(err)
Expand Down Expand Up @@ -184,7 +200,7 @@ func (s *Store) DeleteSchema(ctx context.Context, schemaName string) error {
}

func (s *Store) DeleteTableDocuments(ctx context.Context, schemaName string, tableIDs []string) error {
index := s.adapter.SchemaNameToIndex(schemaName)
index := s.indexNameAdapter.SchemaNameToIndex(schemaName)
if err := s.deleteTableDocuments(ctx, index, tableIDs); err != nil {
return mapError(err)
}
Expand Down Expand Up @@ -241,7 +257,7 @@ func (s *Store) getLastSchemaLogEntry(ctx context.Context, schemaName string) (*
}

func (s *Store) schemaExists(ctx context.Context, schemaName string) (bool, error) {
indexName := s.adapter.SchemaNameToIndex(schemaName)
indexName := s.indexNameAdapter.SchemaNameToIndex(schemaName)
exists, err := s.client.IndexExists(ctx, indexName.NameWithVersion())
if err != nil {
return false, mapError(err)
Expand All @@ -250,7 +266,7 @@ func (s *Store) schemaExists(ctx context.Context, schemaName string) (bool, erro
}

func (s *Store) createSchema(ctx context.Context, schemaName string) error {
index := s.adapter.SchemaNameToIndex(schemaName)
index := s.indexNameAdapter.SchemaNameToIndex(schemaName)
err := s.client.CreateIndex(ctx, index.NameWithVersion(), map[string]any{
"mappings": map[string]any{
"dynamic": "strict",
Expand Down Expand Up @@ -284,7 +300,7 @@ func (s *Store) createSchema(ctx context.Context, schemaName string) error {
}

func (s *Store) updateMapping(ctx context.Context, schemaName string, logEntry *schemalog.LogEntry, diff *schemalog.SchemaDiff) error {
index := s.adapter.SchemaNameToIndex(schemaName)
index := s.indexNameAdapter.SchemaNameToIndex(schemaName)
if diff != nil {
if err := s.updateMappingAddNewColumns(ctx, index, diff.ColumnsToAdd); err != nil {
return fmt.Errorf("failed to add new columns: %w", mapError(err))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,7 @@ func TestStore_getLastSchemaLogEntry(t *testing.T) {
tests := []struct {
name string
client es.SearchClient
adapter Adapter
adapter SearchAdapter
marshaler func(any) ([]byte, error)

wantLogEntry *schemalog.LogEntry
Expand Down
2 changes: 1 addition & 1 deletion pkg/wal/processor/search/search_store_retrier.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ const (

var errPartialDocumentSend = errors.New("failed to send some or all documents")

func NewStoreRetrier(s Store, cfg *StoreRetryConfig, opts ...StoreOption) *StoreRetrier {
func NewStoreRetrier(s Store, cfg StoreRetryConfig, opts ...StoreOption) *StoreRetrier {
sr := &StoreRetrier{
inner: s,
logger: loglib.NewNoopLogger(),
Expand Down
27 changes: 18 additions & 9 deletions pkg/wal/processor/translator/wal_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ type Translator struct {
logger loglib.Logger
processor processor.Processor
walToLogEntryAdapter walToLogEntryAdapter
skipSchema schemaFilter
skipDataEvent dataEventFilter
skipSchemaEvent schemaEventFilter
schemaLogStore schemalog.Store
idFinder columnFinder
versionFinder columnFinder
Expand All @@ -38,8 +39,9 @@ type Config struct {
// configurable filters that allow the user of this library to have flexibility
// when processing and translating the wal event data
type (
schemaFilter func(string) bool
columnFinder func(*schemalog.Column, *schemalog.Table) bool
dataEventFilter func(*wal.Data) bool
schemaEventFilter func(*schemalog.LogEntry) bool
columnFinder func(*schemalog.Column, *schemalog.Table) bool
)

type Option func(t *Translator)
Expand All @@ -62,8 +64,9 @@ func New(cfg *Config, p processor.Processor, opts ...Option) (*Translator, error
processor: p,
schemaLogStore: schemaLogStore,
walToLogEntryAdapter: processor.WalDataToLogEntry,
// by default all schemas are processed
skipSchema: func(s string) bool { return false },
// by default all events are processed
skipDataEvent: func(*wal.Data) bool { return false },
skipSchemaEvent: func(*schemalog.LogEntry) bool { return false },
// by default we look for the primary key to use as identity column
idFinder: primaryKeyFinder,
}
Expand All @@ -87,9 +90,15 @@ func WithVersionFinder(versionFinder columnFinder) Option {
}
}

func WithSkipSchema(skipSchema schemaFilter) Option {
func WithSkipSchemaEvent(skip schemaEventFilter) Option {
return func(t *Translator) {
t.skipSchema = skipSchema
t.skipSchemaEvent = skip
}
}

func WithSkipDataEvent(skip dataEventFilter) Option {
return func(t *Translator) {
t.skipDataEvent = skip
}
}

Expand All @@ -109,7 +118,7 @@ func (t *Translator) ProcessWALEvent(ctx context.Context, event *wal.Event) erro
}

data := event.Data
if t.skipSchema(data.Schema) {
if t.skipDataEvent(data) {
return nil
}

Expand All @@ -125,7 +134,7 @@ func (t *Translator) ProcessWALEvent(ctx context.Context, event *wal.Event) erro
return err
}

if t.skipSchema(logEntry.SchemaName) {
if t.skipSchemaEvent(logEntry) {
return nil
}

Expand Down
Loading