Skip to content

Commit

Permalink
Fix: filtered records holding up pipeline with destination batching
Browse files Browse the repository at this point in the history
  • Loading branch information
hariso committed Nov 22, 2024
1 parent f751508 commit f5a4126
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 13 deletions.
9 changes: 9 additions & 0 deletions pkg/lifecycle/stream/destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,15 @@ func (n *DestinationNode) Run(ctx context.Context) (err error) {
if err != nil || msg == nil {
return err
}
if msg.filtered {
n.logger.Info(ctx).Str(log.MessageIDField, msg.ID()).
Msg("message marked as filtered, sending directly to next node")
err = n.base.Send(ctx, n.logger, msg)
if err != nil {
return msg.Nack(err, n.ID())
}
continue
}

n.logger.Trace(msg.Ctx).Msg("writing record to destination connector")

Expand Down
29 changes: 20 additions & 9 deletions pkg/lifecycle/stream/destination_acker.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ type DestinationAckerNode struct {
// queue is used to store messages
queue deque.Deque[*Message]

// m guards access to queue
m sync.Mutex
queueMutex sync.Mutex

// mctx guards access to the contextCtxCancel function
mctx sync.Mutex
Expand Down Expand Up @@ -96,9 +95,9 @@ func (n *DestinationAckerNode) Run(ctx context.Context) (err error) {
return err
}

n.m.Lock()
n.queueMutex.Lock()
n.queue.PushBack(msg)
n.m.Unlock()
n.queueMutex.Unlock()
select {
case signalChan <- struct{}{}:
// triggered the start of listening to acks in worker goroutine
Expand All @@ -116,9 +115,9 @@ func (n *DestinationAckerNode) worker(
) {
handleError := func(msg *Message, err error) {
// push message back to the front of the queue and return error
n.m.Lock()
n.queueMutex.Lock()
n.queue.PushFront(msg)
n.m.Unlock()
n.queueMutex.Unlock()

errChan <- err
}
Expand All @@ -131,13 +130,25 @@ func (n *DestinationAckerNode) worker(
// let's start fetching acks for messages in the queue
for {
// check if there are more messages waiting in the queue
n.m.Lock()
n.queueMutex.Lock()
if n.queue.Len() == 0 {
n.m.Unlock()
n.queueMutex.Unlock()
break
}
msg := n.queue.PopFront()
n.m.Unlock()
n.queueMutex.Unlock()

if msg.filtered {
n.logger.Info(ctx).
Str(log.MessageIDField, msg.ID()).
Msg("acking filtered message")
err := n.handleAck(msg, nil)
if err != nil {
errChan <- err
return
}
continue
}

if len(acks) == 0 {
// Ack can return multiple acks, store them and check the position
Expand Down
4 changes: 3 additions & 1 deletion pkg/lifecycle/stream/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,16 @@ type Message struct {
acked chan struct{}
nacked chan struct{}

filtered bool

// handler is executed when Ack or Nack is called.
handler StatusChangeHandler

// hasNackHandler is true if at least one nack handler was registered.
hasNackHandler bool

// ackNackReturnValue is cached the first time Ack or Nack is executed.
ackNackReturnValue error

// initOnce is guarding the initialization logic of a message.
initOnce sync.Once
// ackNackOnce is guarding the acking/nacking logic of a message.
Expand Down
10 changes: 10 additions & 0 deletions pkg/lifecycle/stream/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,16 @@ func (n *MetricsNode) Run(ctx context.Context) error {
return err
}

if msg.filtered {
n.logger.Info(ctx).Str(log.MessageIDField, msg.ID()).
Msg("message marked as filtered, sending directly to next node")
err = n.base.Send(ctx, n.logger, msg)
if err != nil {
return msg.Nack(err, n.ID())
}
continue
}

msg.RegisterAckHandler(func(msg *Message) error {
n.Histogram.Observe(msg.Record)
return nil
Expand Down
23 changes: 20 additions & 3 deletions pkg/lifecycle/stream/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,16 @@ func (n *ProcessorNode) Run(ctx context.Context) error {
return err
}

if msg.filtered {
n.logger.Info(ctx).Str(log.MessageIDField, msg.ID()).
Msg("message marked as filtered, sending directly to next node")
err = n.base.Send(ctx, n.logger, msg)
if err != nil {
return msg.Nack(err, n.ID())
}
continue
}

executeTime := time.Now()
recsIn := []opencdc.Record{msg.Record}
recsOut := n.Processor.Process(msg.Ctx, recsIn)
Expand All @@ -104,10 +114,17 @@ func (n *ProcessorNode) Run(ctx context.Context) error {
return err
}
case sdk.FilterRecord:
// NB: Ack skipped messages since they've been correctly handled
err := msg.Ack()
n.logger.Info(ctx).
Str(log.MessageIDField, msg.ID()).
Msg("marking message as filtered")
msg.filtered = true

n.logger.Info(ctx).
Str(log.MessageIDField, msg.ID()).
Msg("message marked as filtered, sending directly to next node")
err = n.base.Send(ctx, n.logger, msg)
if err != nil {
return cerrors.Errorf("failed to ack skipped message: %w", err)
return msg.Nack(err, n.ID())
}
case sdk.ErrorRecord:
err = msg.Nack(v.Error, n.ID())
Expand Down

0 comments on commit f5a4126

Please sign in to comment.