From a9e72a992a749429932d2152c529c3e1336761b2 Mon Sep 17 00:00:00 2001 From: eminano Date: Tue, 1 Oct 2024 10:14:18 +0200 Subject: [PATCH 1/2] Allow to define a version finder that uses LSN --- .../processor/translator/wal_translator.go | 29 +++++++++++----- .../translator/wal_translator_test.go | 33 +++++++++++++++---- 2 files changed, 46 insertions(+), 16 deletions(-) diff --git a/pkg/wal/processor/translator/wal_translator.go b/pkg/wal/processor/translator/wal_translator.go index b9c707d..b215681 100644 --- a/pkg/wal/processor/translator/wal_translator.go +++ b/pkg/wal/processor/translator/wal_translator.go @@ -29,7 +29,7 @@ type Translator struct { skipSchemaEvent schemaEventFilter schemaLogStore schemalog.Store idFinder columnFinder - versionFinder columnFinder + versionFinder columnFinderWithErr } type walToLogEntryAdapter func(*wal.Data) (*schemalog.LogEntry, error) @@ -41,13 +41,16 @@ type Config struct { // configurable filters that allow the user of this library to have flexibility // when processing and translating the wal event data type ( - dataEventFilter func(*wal.Data) bool - schemaEventFilter func(*schemalog.LogEntry) bool - columnFinder func(*schemalog.Column, *schemalog.Table) bool + dataEventFilter func(*wal.Data) bool + schemaEventFilter func(*schemalog.LogEntry) bool + columnFinder func(*schemalog.Column, *schemalog.Table) bool + columnFinderWithErr func(*schemalog.Column, *schemalog.Table) (bool, error) ) type Option func(t *Translator) +var ErrUseLSN = errors.New("use LSN for as event version") + // New will return a translator processor wrapper that will inject pgstream // metadata into the wal data events before passing them over to the processor // on input. By default, all schemas are processed and the pgstream identity @@ -86,7 +89,7 @@ func WithIDFinder(idFinder columnFinder) Option { } } -func WithVersionFinder(versionFinder columnFinder) Option { +func WithVersionFinder(versionFinder columnFinderWithErr) Option { return func(t *Translator) { t.versionFinder = versionFinder } @@ -231,10 +234,18 @@ func (t *Translator) fillEventMetadata(event *wal.Data, log *schemalog.LogEntry, continue } - if t.versionFinder != nil && t.versionFinder(col, tbl) && !foundVersion { - foundVersion = true - event.Metadata.InternalColVersion = col.PgstreamID - continue + if t.versionFinder != nil && !foundVersion { + isVersionCol, err := t.versionFinder(col, tbl) + if err != nil && errors.Is(err, ErrUseLSN) { + foundVersion = true + event.Metadata.InternalColVersion = "" + continue + } + if isVersionCol { + foundVersion = true + event.Metadata.InternalColVersion = col.PgstreamID + continue + } } } diff --git a/pkg/wal/processor/translator/wal_translator_test.go b/pkg/wal/processor/translator/wal_translator_test.go index 99714dc..043a702 100644 --- a/pkg/wal/processor/translator/wal_translator_test.go +++ b/pkg/wal/processor/translator/wal_translator_test.go @@ -193,7 +193,7 @@ func TestTranslator_ProcessWALEvent(t *testing.T) { skipDataEvent: func(d *wal.Data) bool { return false }, skipSchemaEvent: func(*schemalog.LogEntry) bool { return false }, idFinder: func(c *schemalog.Column, _ *schemalog.Table) bool { return c.Name == "col-1" }, - versionFinder: func(c *schemalog.Column, _ *schemalog.Table) bool { return c.Name == "col-2" }, + versionFinder: func(c *schemalog.Column, _ *schemalog.Table) (bool, error) { return c.Name == "col-2", nil }, walToLogEntryAdapter: func(d *wal.Data) (*schemalog.LogEntry, error) { return testLogEntry, nil }, } @@ -227,7 +227,7 @@ func TestTranslator_translate(t *testing.T) { data *wal.Data store schemalog.Store idFinder columnFinder - versionFinder columnFinder + versionFinder columnFinderWithErr wantData *wal.Data wantErr error @@ -249,7 +249,7 @@ func TestTranslator_translate(t *testing.T) { }, data: newTestDataEvent("I").Data, idFinder: primaryKeyFinder, - versionFinder: func(c *schemalog.Column, _ *schemalog.Table) bool { return c.Name == "col-2" }, + versionFinder: func(c *schemalog.Column, _ *schemalog.Table) (bool, error) { return c.Name == "col-2", nil }, wantData: newTestDataEventWithMetadata("I").Data, wantErr: nil, @@ -264,7 +264,7 @@ func TestTranslator_translate(t *testing.T) { }, data: newTestDataEvent("I").Data, idFinder: func(c *schemalog.Column, _ *schemalog.Table) bool { return c.Name == "col-1" }, - versionFinder: func(c *schemalog.Column, _ *schemalog.Table) bool { return c.Name == "col-2" }, + versionFinder: func(c *schemalog.Column, _ *schemalog.Table) (bool, error) { return c.Name == "col-2", nil }, wantData: newTestDataEventWithMetadata("I").Data, wantErr: nil, @@ -287,6 +287,25 @@ func TestTranslator_translate(t *testing.T) { }(), wantErr: nil, }, + { + name: "ok - version finder provided with use LSN error", + store: &schemalogmocks.Store{ + FetchFn: func(ctx context.Context, schemaName string, ackedOnly bool) (*schemalog.LogEntry, error) { + require.Equal(t, testSchemaName, schemaName) + return newTestLogEntry(), nil + }, + }, + data: newTestDataEvent("I").Data, + idFinder: func(c *schemalog.Column, _ *schemalog.Table) bool { return c.Name == "col-1" }, + versionFinder: func(c *schemalog.Column, _ *schemalog.Table) (bool, error) { return false, ErrUseLSN }, + + wantData: func() *wal.Data { + d := newTestDataEventWithMetadata("I").Data + d.Metadata.InternalColVersion = "" + return d + }(), + wantErr: nil, + }, { name: "error - fetching schema log entry", store: &schemalogmocks.Store{ @@ -342,7 +361,7 @@ func TestTranslator_translate(t *testing.T) { }, data: newTestDataEvent("I").Data, idFinder: func(c *schemalog.Column, _ *schemalog.Table) bool { return false }, - versionFinder: func(c *schemalog.Column, _ *schemalog.Table) bool { return false }, + versionFinder: func(c *schemalog.Column, _ *schemalog.Table) (bool, error) { return false, nil }, wantData: func() *wal.Data { d := newTestDataEvent("I").Data @@ -364,7 +383,7 @@ func TestTranslator_translate(t *testing.T) { }, data: newTestDataEvent("I").Data, idFinder: func(c *schemalog.Column, _ *schemalog.Table) bool { return c.Name == "col-1" }, - versionFinder: func(c *schemalog.Column, _ *schemalog.Table) bool { return false }, + versionFinder: func(c *schemalog.Column, _ *schemalog.Table) (bool, error) { return false, nil }, wantData: func() *wal.Data { d := newTestDataEvent("I").Data @@ -393,7 +412,7 @@ func TestTranslator_translate(t *testing.T) { return d }(), idFinder: func(c *schemalog.Column, _ *schemalog.Table) bool { return c.Name == "col-1" }, - versionFinder: func(c *schemalog.Column, _ *schemalog.Table) bool { return c.Name == "col-2" }, + versionFinder: func(c *schemalog.Column, _ *schemalog.Table) (bool, error) { return c.Name == "col-2", nil }, wantData: func() *wal.Data { d := newTestDataEventWithMetadata("I").Data From 35e08cb94089e8fcbd7b3c04c411a8fd88006072 Mon Sep 17 00:00:00 2001 From: eminano Date: Tue, 1 Oct 2024 10:21:19 +0200 Subject: [PATCH 2/2] Correct typo --- pkg/wal/processor/translator/wal_translator.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/wal/processor/translator/wal_translator.go b/pkg/wal/processor/translator/wal_translator.go index b215681..dc5213a 100644 --- a/pkg/wal/processor/translator/wal_translator.go +++ b/pkg/wal/processor/translator/wal_translator.go @@ -49,7 +49,7 @@ type ( type Option func(t *Translator) -var ErrUseLSN = errors.New("use LSN for as event version") +var ErrUseLSN = errors.New("use LSN as event version") // New will return a translator processor wrapper that will inject pgstream // metadata into the wal data events before passing them over to the processor