Skip to content

Commit

Permalink
Merge pull request #60 from xataio/update-lib-hooks
Browse files Browse the repository at this point in the history
Update lib hooks
  • Loading branch information
eminano authored Aug 6, 2024
2 parents 7418324 + 1d848fb commit b56dba5
Show file tree
Hide file tree
Showing 13 changed files with 116 additions and 79 deletions.
2 changes: 1 addition & 1 deletion cmd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func parseSearchProcessorConfig() *stream.SearchProcessorConfig {
Store: opensearch.Config{
URL: searchStore,
},
Retrier: &search.StoreRetryConfig{
Retrier: search.StoreRetryConfig{
Backoff: parseBackoffConfig("PGSTREAM_SEARCH_STORE"),
},
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ require (
github.com/google/go-cmp v0.6.0
github.com/jackc/pgerrcode v0.0.0-20220416144525-469b46aa5efa
github.com/jackc/pglogrepl v0.0.0-20240307033717-828fbfe908e9
github.com/jackc/pgx/v5 v5.5.5
github.com/jackc/pgx/v5 v5.6.0
github.com/labstack/echo/v4 v4.12.0
github.com/mitchellh/mapstructure v1.5.0
github.com/pterm/pterm v0.12.79
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,8 @@ github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsI
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk=
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
github.com/jackc/pgx/v5 v5.5.5 h1:amBjrZVmksIdNjxGW/IiIMzxMKZFelXbUoPNb+8sjQw=
github.com/jackc/pgx/v5 v5.5.5/go.mod h1:ez9gk+OAat140fv9ErkZDYFWmXLfV+++K0uAOiwgm1A=
github.com/jackc/pgx/v5 v5.6.0 h1:SWJzexBzPL5jb0GEsrPMLIsi/3jOo7RHlzTjcAeDrPY=
github.com/jackc/pgx/v5 v5.6.0/go.mod h1:DNZ/vlrUnhWCoFGxHAG8U2ljioxukquj7utPDgtQdTw=
github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk=
github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8=
Expand Down
2 changes: 0 additions & 2 deletions pkg/schemalog/log_entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,6 @@ func (m *LogEntry) UnmarshalJSON(b []byte) error {
if err := json.Unmarshal([]byte(schemaStr), &m.Schema); err != nil {
return err
}
default:
panic(fmt.Sprintf("unmarshal LogEntry, got unexpected key when unmarshalling: %s", k))
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/stream/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type KafkaProcessorConfig struct {
type SearchProcessorConfig struct {
Indexer search.IndexerConfig
Store opensearch.Config
Retrier *search.StoreRetryConfig
Retrier search.StoreRetryConfig
}

type WebhookProcessorConfig struct {
Expand Down
5 changes: 1 addition & 4 deletions pkg/stream/stream_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,7 @@ func Run(ctx context.Context, logger loglib.Logger, config *Config, meter metric
if err != nil {
return err
}
if config.Processor.Search.Retrier != nil {
logger.Debug("using retry logic with search store...")
searchStore = search.NewStoreRetrier(searchStore, config.Processor.Search.Retrier, search.WithStoreLogger(logger))
}
searchStore = search.NewStoreRetrier(searchStore, config.Processor.Search.Retrier, search.WithStoreLogger(logger))

searchIndexer := search.NewBatchIndexer(ctx,
config.Processor.Search.Indexer,
Expand Down
31 changes: 11 additions & 20 deletions pkg/wal/processor/search/opensearch/opensearch_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,44 +5,35 @@ package opensearch
import (
"encoding/json"
"fmt"
"strings"

"github.com/xataio/pgstream/internal/es"
"github.com/xataio/pgstream/pkg/schemalog"
"github.com/xataio/pgstream/pkg/wal/processor/search"
)

// Adapter converts from/to search types and opensearch types
type Adapter interface {
SchemaNameToIndex(schemaName string) IndexName
IndexToSchemaName(index string) string
type SearchAdapter interface {
SearchDocToBulkItem(docs search.Document) es.BulkItem
BulkItemsToSearchDocErrs(items []es.BulkItem) []search.DocumentError
RecordToLogEntry(rec map[string]any) (*schemalog.LogEntry, error)
}

type adapter struct {
marshaler func(any) ([]byte, error)
unmarshaler func([]byte, any) error
indexNameAdapter IndexNameAdapter
marshaler func(any) ([]byte, error)
unmarshaler func([]byte, any) error
}

func newDefaultAdapter() *adapter {
func newDefaultAdapter(indexNameAdapter IndexNameAdapter) *adapter {
return &adapter{
marshaler: json.Marshal,
unmarshaler: json.Unmarshal,
indexNameAdapter: indexNameAdapter,
marshaler: json.Marshal,
unmarshaler: json.Unmarshal,
}
}

func (a *adapter) SchemaNameToIndex(schemaName string) IndexName {
return newDefaultIndexName(schemaName)
}

func (a *adapter) IndexToSchemaName(index string) string {
return strings.TrimSuffix(index, "-1")
}

func (a *adapter) SearchDocToBulkItem(doc search.Document) es.BulkItem {
indexName := a.SchemaNameToIndex(doc.Schema)
indexName := a.indexNameAdapter.SchemaNameToIndex(doc.Schema)
item := es.BulkItem{
Doc: doc.Data,
}
Expand Down Expand Up @@ -94,13 +85,13 @@ func (a *adapter) bulkItemToSearchDocErr(item es.BulkItem) search.DocumentError
}
switch {
case item.Index != nil:
doc.Document.Schema = a.IndexToSchemaName(item.Index.Index)
doc.Document.Schema = a.indexNameAdapter.IndexToSchemaName(item.Index.Index)
doc.Document.ID = item.Index.ID
if item.Index.Version != nil {
doc.Document.Version = *item.Index.Version
}
case item.Delete != nil:
doc.Document.Schema = a.IndexToSchemaName(item.Delete.Index)
doc.Document.Schema = a.indexNameAdapter.IndexToSchemaName(item.Delete.Index)
doc.Document.ID = item.Delete.ID
if item.Delete.Version != nil {
doc.Document.Version = *item.Delete.Version
Expand Down
32 changes: 26 additions & 6 deletions pkg/wal/processor/search/opensearch/opensearch_index_name.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,14 @@ package opensearch

import (
"fmt"
"strings"
)

type IndexNameAdapter interface {
SchemaNameToIndex(schemaName string) IndexName
IndexToSchemaName(index string) string
}

// IndexName represents an opensearch index name constructed from a schema name.
type IndexName interface {
Name() string
Expand All @@ -14,33 +20,47 @@ type IndexName interface {
SchemaName() string
}

type indexName struct {
type defaultIndexNameAdapter struct{}

func newDefaultIndexNameAdapter() IndexNameAdapter {
return &defaultIndexNameAdapter{}
}

func (i *defaultIndexNameAdapter) SchemaNameToIndex(schemaName string) IndexName {
return newDefaultIndexName(schemaName)
}

func (i *defaultIndexNameAdapter) IndexToSchemaName(index string) string {
return strings.TrimSuffix(index, "-1")
}

type defaultIndexName struct {
schemaName string
version int
}

func newDefaultIndexName(schemaName string) IndexName {
return &indexName{
return &defaultIndexName{
schemaName: schemaName,
version: 1,
}
}

func (i indexName) SchemaName() string {
func (i defaultIndexName) SchemaName() string {
return i.schemaName
}

// NameWithVersion represents the name of the index with the version number. This should
// generally not be needed, in favour of `Name`.
func (i indexName) NameWithVersion() string {
func (i defaultIndexName) NameWithVersion() string {
return fmt.Sprintf("%s-%d", i.schemaName, i.version)
}

// Name returns the name we should use for querying the index.
func (i *indexName) Name() string {
func (i *defaultIndexName) Name() string {
return i.schemaName
}

func (i *indexName) Version() int {
func (i *defaultIndexName) Version() int {
return i.version
}
46 changes: 31 additions & 15 deletions pkg/wal/processor/search/opensearch/opensearch_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@ import (
)

type Store struct {
logger loglib.Logger
client es.SearchClient
mapper search.Mapper
adapter Adapter
marshaler func(any) ([]byte, error)
logger loglib.Logger
client es.SearchClient
mapper search.Mapper
adapter SearchAdapter
indexNameAdapter IndexNameAdapter
marshaler func(any) ([]byte, error)
}

type Config struct {
Expand Down Expand Up @@ -57,12 +58,14 @@ func NewStore(cfg Config, opts ...Option) (*Store, error) {
}

func NewStoreWithClient(client es.SearchClient) *Store {
indexNameAdapter := newDefaultIndexNameAdapter()
return &Store{
logger: loglib.NewNoopLogger(),
client: client,
adapter: newDefaultAdapter(),
mapper: NewPostgresMapper(),
marshaler: json.Marshal,
logger: loglib.NewNoopLogger(),
client: client,
indexNameAdapter: indexNameAdapter,
adapter: newDefaultAdapter(indexNameAdapter),
mapper: NewPostgresMapper(),
marshaler: json.Marshal,
}
}

Expand All @@ -72,6 +75,19 @@ func WithLogger(l loglib.Logger) Option {
}
}

func WithMapper(m search.Mapper) Option {
return func(s *Store) {
s.mapper = m
}
}

func WithIndexNameAdapter(a IndexNameAdapter) Option {
return func(s *Store) {
s.indexNameAdapter = a
s.adapter = newDefaultAdapter(a)
}
}

func (s *Store) GetMapper() search.Mapper {
return s.mapper
}
Expand Down Expand Up @@ -154,7 +170,7 @@ func (s *Store) SendDocuments(ctx context.Context, docs []search.Document) ([]se
}

func (s *Store) DeleteSchema(ctx context.Context, schemaName string) error {
index := s.adapter.SchemaNameToIndex(schemaName)
index := s.indexNameAdapter.SchemaNameToIndex(schemaName)
exists, err := s.client.IndexExists(ctx, index.NameWithVersion())
if err != nil {
return mapError(err)
Expand Down Expand Up @@ -184,7 +200,7 @@ func (s *Store) DeleteSchema(ctx context.Context, schemaName string) error {
}

func (s *Store) DeleteTableDocuments(ctx context.Context, schemaName string, tableIDs []string) error {
index := s.adapter.SchemaNameToIndex(schemaName)
index := s.indexNameAdapter.SchemaNameToIndex(schemaName)
if err := s.deleteTableDocuments(ctx, index, tableIDs); err != nil {
return mapError(err)
}
Expand Down Expand Up @@ -241,7 +257,7 @@ func (s *Store) getLastSchemaLogEntry(ctx context.Context, schemaName string) (*
}

func (s *Store) schemaExists(ctx context.Context, schemaName string) (bool, error) {
indexName := s.adapter.SchemaNameToIndex(schemaName)
indexName := s.indexNameAdapter.SchemaNameToIndex(schemaName)
exists, err := s.client.IndexExists(ctx, indexName.NameWithVersion())
if err != nil {
return false, mapError(err)
Expand All @@ -250,7 +266,7 @@ func (s *Store) schemaExists(ctx context.Context, schemaName string) (bool, erro
}

func (s *Store) createSchema(ctx context.Context, schemaName string) error {
index := s.adapter.SchemaNameToIndex(schemaName)
index := s.indexNameAdapter.SchemaNameToIndex(schemaName)
err := s.client.CreateIndex(ctx, index.NameWithVersion(), map[string]any{
"mappings": map[string]any{
"dynamic": "strict",
Expand Down Expand Up @@ -284,7 +300,7 @@ func (s *Store) createSchema(ctx context.Context, schemaName string) error {
}

func (s *Store) updateMapping(ctx context.Context, schemaName string, logEntry *schemalog.LogEntry, diff *schemalog.SchemaDiff) error {
index := s.adapter.SchemaNameToIndex(schemaName)
index := s.indexNameAdapter.SchemaNameToIndex(schemaName)
if diff != nil {
if err := s.updateMappingAddNewColumns(ctx, index, diff.ColumnsToAdd); err != nil {
return fmt.Errorf("failed to add new columns: %w", mapError(err))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,7 @@ func TestStore_getLastSchemaLogEntry(t *testing.T) {
tests := []struct {
name string
client es.SearchClient
adapter Adapter
adapter SearchAdapter
marshaler func(any) ([]byte, error)

wantLogEntry *schemalog.LogEntry
Expand Down
2 changes: 1 addition & 1 deletion pkg/wal/processor/search/search_store_retrier.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ const (

var errPartialDocumentSend = errors.New("failed to send some or all documents")

func NewStoreRetrier(s Store, cfg *StoreRetryConfig, opts ...StoreOption) *StoreRetrier {
func NewStoreRetrier(s Store, cfg StoreRetryConfig, opts ...StoreOption) *StoreRetrier {
sr := &StoreRetrier{
inner: s,
logger: loglib.NewNoopLogger(),
Expand Down
27 changes: 18 additions & 9 deletions pkg/wal/processor/translator/wal_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ type Translator struct {
logger loglib.Logger
processor processor.Processor
walToLogEntryAdapter walToLogEntryAdapter
skipSchema schemaFilter
skipDataEvent dataEventFilter
skipSchemaEvent schemaEventFilter
schemaLogStore schemalog.Store
idFinder columnFinder
versionFinder columnFinder
Expand All @@ -38,8 +39,9 @@ type Config struct {
// configurable filters that allow the user of this library to have flexibility
// when processing and translating the wal event data
type (
schemaFilter func(string) bool
columnFinder func(*schemalog.Column, *schemalog.Table) bool
dataEventFilter func(*wal.Data) bool
schemaEventFilter func(*schemalog.LogEntry) bool
columnFinder func(*schemalog.Column, *schemalog.Table) bool
)

type Option func(t *Translator)
Expand All @@ -62,8 +64,9 @@ func New(cfg *Config, p processor.Processor, opts ...Option) (*Translator, error
processor: p,
schemaLogStore: schemaLogStore,
walToLogEntryAdapter: processor.WalDataToLogEntry,
// by default all schemas are processed
skipSchema: func(s string) bool { return false },
// by default all events are processed
skipDataEvent: func(*wal.Data) bool { return false },
skipSchemaEvent: func(*schemalog.LogEntry) bool { return false },
// by default we look for the primary key to use as identity column
idFinder: primaryKeyFinder,
}
Expand All @@ -87,9 +90,15 @@ func WithVersionFinder(versionFinder columnFinder) Option {
}
}

func WithSkipSchema(skipSchema schemaFilter) Option {
func WithSkipSchemaEvent(skip schemaEventFilter) Option {
return func(t *Translator) {
t.skipSchema = skipSchema
t.skipSchemaEvent = skip
}
}

func WithSkipDataEvent(skip dataEventFilter) Option {
return func(t *Translator) {
t.skipDataEvent = skip
}
}

Expand All @@ -109,7 +118,7 @@ func (t *Translator) ProcessWALEvent(ctx context.Context, event *wal.Event) erro
}

data := event.Data
if t.skipSchema(data.Schema) {
if t.skipDataEvent(data) {
return nil
}

Expand All @@ -125,7 +134,7 @@ func (t *Translator) ProcessWALEvent(ctx context.Context, event *wal.Event) erro
return err
}

if t.skipSchema(logEntry.SchemaName) {
if t.skipSchemaEvent(logEntry) {
return nil
}

Expand Down
Loading

0 comments on commit b56dba5

Please sign in to comment.