Skip to content

Commit

Permalink
Use postgres lib in replication handler
Browse files Browse the repository at this point in the history
  • Loading branch information
eminano committed Jul 1, 2024
1 parent aaa5dba commit fd282fc
Show file tree
Hide file tree
Showing 10 changed files with 114 additions and 222 deletions.
30 changes: 11 additions & 19 deletions pkg/wal/listener/postgres/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,33 +20,25 @@ func newMockReplicationHandler() *replicationmocks.Handler {
StartReplicationFn: func(context.Context) error { return nil },
GetLSNParserFn: func() replication.LSNParser { return newMockLSNParser() },
SyncLSNFn: func(ctx context.Context, lsn replication.LSN) error { return nil },
ReceiveMessageFn: func(ctx context.Context, i uint64) (replication.Message, error) {
ReceiveMessageFn: func(ctx context.Context, i uint64) (*replication.Message, error) {
return newMockMessage(), nil
},
}
}

func newMockMessage() *replicationmocks.Message {
return &replicationmocks.Message{
GetDataFn: func() *replication.MessageData {
return &replication.MessageData{
LSN: testLSN,
Data: []byte("test-data"),
ReplyRequested: false,
ServerTime: time.Now(),
}
},
func newMockMessage() *replication.Message {
return &replication.Message{
LSN: testLSN,
Data: []byte("test-data"),
ReplyRequested: false,
ServerTime: time.Now(),
}
}

func newMockKeepAliveMessage(replyRequested bool) *replicationmocks.Message {
return &replicationmocks.Message{
GetDataFn: func() *replication.MessageData {
return &replication.MessageData{
LSN: testLSN,
ReplyRequested: replyRequested,
}
},
func newMockKeepAliveMessage(replyRequested bool) *replication.Message {
return &replication.Message{
LSN: testLSN,
ReplyRequested: replyRequested,
}
}

Expand Down
26 changes: 12 additions & 14 deletions pkg/wal/listener/postgres/wal_pg_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type Listener struct {

type replicationHandler interface {
StartReplication(ctx context.Context) error
ReceiveMessage(ctx context.Context) (replication.Message, error)
ReceiveMessage(ctx context.Context) (*replication.Message, error)
GetLSNParser() replication.LSNParser
Close() error
}
Expand Down Expand Up @@ -89,46 +89,44 @@ func (l *Listener) listen(ctx context.Context) error {
default:
msg, err := l.replicationHandler.ReceiveMessage(ctx)
if err != nil {
replErr := &replication.Error{}
if errors.Is(err, replication.ErrConnTimeout) || (errors.As(err, &replErr) && replErr.Severity == "WARNING") {
if errors.Is(err, replication.ErrConnTimeout) {
continue
}
return fmt.Errorf("receiving message: %w", err)
}

msgData := msg.GetData()
if msgData == nil {
if msg == nil || msg.Data == nil {
continue
}

l.logger.Trace("", loglib.Fields{
"wal_end": l.lsnParser.ToString(msgData.LSN),
"server_time": msgData.ServerTime,
"wal_data": msgData.Data,
"wal_end": l.lsnParser.ToString(msg.LSN),
"server_time": msg.ServerTime,
"wal_data": msg.Data,
})

if err := l.processWALEvent(ctx, msgData); err != nil {
if err := l.processWALEvent(ctx, msg); err != nil {
return err
}
}
}
}

func (l *Listener) processWALEvent(ctx context.Context, msgData *replication.MessageData) error {
func (l *Listener) processWALEvent(ctx context.Context, msg *replication.Message) error {
// if there's no data, it's a keep alive. If a reply is not requested,
// no need to process this message.
if msgData.Data == nil && !msgData.ReplyRequested {
if msg.Data == nil && !msg.ReplyRequested {
return nil
}

event := &wal.Event{}
if msgData.Data != nil {
if msg.Data != nil {
event.Data = &wal.Data{}
if err := l.walDataDeserialiser(msgData.Data, event.Data); err != nil {
if err := l.walDataDeserialiser(msg.Data, event.Data); err != nil {
return fmt.Errorf("error unmarshaling wal data: %w", err)
}
}
event.CommitPosition = wal.CommitPosition(l.lsnParser.ToString(msgData.LSN))
event.CommitPosition = wal.CommitPosition(l.lsnParser.ToString(msg.LSN))

return l.processEvent(ctx, event)
}
20 changes: 9 additions & 11 deletions pkg/wal/listener/postgres/wal_pg_listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,8 @@ import (
func TestListener_Listen(t *testing.T) {
t.Parallel()

emptyMessage := &replicationmocks.Message{
GetDataFn: func() *replication.MessageData {
return nil
},
emptyMessage := &replication.Message{
Data: nil,
}

testDeserialiser := func(_ []byte, out any) error {
Expand Down Expand Up @@ -60,7 +58,7 @@ func TestListener_Listen(t *testing.T) {
name: "ok - message received",
replicationHandler: func(doneChan chan struct{}) *replicationmocks.Handler {
h := newMockReplicationHandler()
h.ReceiveMessageFn = func(ctx context.Context, i uint64) (replication.Message, error) {
h.ReceiveMessageFn = func(ctx context.Context, i uint64) (*replication.Message, error) {
defer func() {
if i == 1 {
doneChan <- struct{}{}
Expand All @@ -83,7 +81,7 @@ func TestListener_Listen(t *testing.T) {
name: "ok - timeout on receive message, retried",
replicationHandler: func(doneChan chan struct{}) *replicationmocks.Handler {
h := newMockReplicationHandler()
h.ReceiveMessageFn = func(ctx context.Context, i uint64) (replication.Message, error) {
h.ReceiveMessageFn = func(ctx context.Context, i uint64) (*replication.Message, error) {
defer func() {
if i == 2 {
doneChan <- struct{}{}
Expand All @@ -108,7 +106,7 @@ func TestListener_Listen(t *testing.T) {
name: "ok - nil msg data",
replicationHandler: func(doneChan chan struct{}) *replicationmocks.Handler {
h := newMockReplicationHandler()
h.ReceiveMessageFn = func(ctx context.Context, i uint64) (replication.Message, error) {
h.ReceiveMessageFn = func(ctx context.Context, i uint64) (*replication.Message, error) {
defer func() {
if i == 1 {
doneChan <- struct{}{}
Expand All @@ -126,7 +124,7 @@ func TestListener_Listen(t *testing.T) {
name: "ok - keep alive",
replicationHandler: func(doneChan chan struct{}) *replicationmocks.Handler {
h := newMockReplicationHandler()
h.ReceiveMessageFn = func(ctx context.Context, i uint64) (replication.Message, error) {
h.ReceiveMessageFn = func(ctx context.Context, i uint64) (*replication.Message, error) {
defer func() {
if i == 1 {
doneChan <- struct{}{}
Expand Down Expand Up @@ -154,7 +152,7 @@ func TestListener_Listen(t *testing.T) {
name: "error - receiving message",
replicationHandler: func(doneChan chan struct{}) *replicationmocks.Handler {
h := newMockReplicationHandler()
h.ReceiveMessageFn = func(ctx context.Context, i uint64) (replication.Message, error) {
h.ReceiveMessageFn = func(ctx context.Context, i uint64) (*replication.Message, error) {
defer func() {
if i == 1 {
doneChan <- struct{}{}
Expand All @@ -172,7 +170,7 @@ func TestListener_Listen(t *testing.T) {
name: "error - processing wal event",
replicationHandler: func(doneChan chan struct{}) *replicationmocks.Handler {
h := newMockReplicationHandler()
h.ReceiveMessageFn = func(ctx context.Context, i uint64) (replication.Message, error) {
h.ReceiveMessageFn = func(ctx context.Context, i uint64) (*replication.Message, error) {
defer func() {
if i == 1 {
doneChan <- struct{}{}
Expand All @@ -190,7 +188,7 @@ func TestListener_Listen(t *testing.T) {
name: "error - deserialising wal event",
replicationHandler: func(doneChan chan struct{}) *replicationmocks.Handler {
h := newMockReplicationHandler()
h.ReceiveMessageFn = func(ctx context.Context, i uint64) (replication.Message, error) {
h.ReceiveMessageFn = func(ctx context.Context, i uint64) (*replication.Message, error) {
defer func() {
if i == 1 {
doneChan <- struct{}{}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (h *Handler) StartReplication(ctx context.Context) error {
return h.inner.StartReplication(ctx)
}

func (h *Handler) ReceiveMessage(ctx context.Context) (msg replication.Message, err error) {
func (h *Handler) ReceiveMessage(ctx context.Context) (*replication.Message, error) {
return h.inner.ReceiveMessage(ctx)
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/wal/replication/mocks/mock_replication_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (

type Handler struct {
StartReplicationFn func(context.Context) error
ReceiveMessageFn func(context.Context, uint64) (replication.Message, error)
ReceiveMessageFn func(context.Context, uint64) (*replication.Message, error)
SyncLSNFn func(context.Context, replication.LSN) error
DropReplicationSlotFn func(ctx context.Context) error
GetLSNParserFn func() replication.LSNParser
Expand All @@ -24,7 +24,7 @@ func (m *Handler) StartReplication(ctx context.Context) error {
return m.StartReplicationFn(ctx)
}

func (m *Handler) ReceiveMessage(ctx context.Context) (replication.Message, error) {
func (m *Handler) ReceiveMessage(ctx context.Context) (*replication.Message, error) {
atomic.AddUint64(&m.ReceiveMessageCalls, 1)
return m.ReceiveMessageFn(ctx, m.GetReceiveMessageCalls())
}
Expand Down
15 changes: 0 additions & 15 deletions pkg/wal/replication/mocks/mock_replication_message.go

This file was deleted.

30 changes: 0 additions & 30 deletions pkg/wal/replication/postgres/pg_replication_errors.go

This file was deleted.

Loading

0 comments on commit fd282fc

Please sign in to comment.