Skip to content

Commit

Permalink
use worker's dbClient in vr.setState; use same context for worker poo…
Browse files Browse the repository at this point in the history
…l drain()

Signed-off-by: Shlomi Noach <[email protected]>
  • Loading branch information
shlomi-noach committed Jan 1, 2025
1 parent e054980 commit b295b89
Show file tree
Hide file tree
Showing 6 changed files with 28 additions and 19 deletions.
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletmanager/vreplication/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ func (ct *controller) runBlp(ctx context.Context) (err error) {
isUnrecoverableError(err) ||
!ct.lastWorkflowError.ShouldRetry() {
err = vterrors.Wrapf(err, TerminalErrorIndicator)
if errSetState := vr.setState(binlogdatapb.VReplicationWorkflowState_Error, err.Error()); errSetState != nil {
if errSetState := vr.setState(binlogdatapb.VReplicationWorkflowState_Error, nil, err.Error()); errSetState != nil {
log.Errorf("INTERNAL: unable to setState() in controller: %v. Could not set error text to: %v.", errSetState, err)
return err // yes, err and not errSetState.
}
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vttablet/tabletmanager/vreplication/vcopier.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ func (vc *vcopier) initTablesForCopy(ctx context.Context) error {
if _, err := vc.vr.dbClient.Execute(buf.String()); err != nil {
return err
}
if err := vc.vr.setState(binlogdatapb.VReplicationWorkflowState_Copying, ""); err != nil {
if err := vc.vr.setState(binlogdatapb.VReplicationWorkflowState_Copying, nil, ""); err != nil {
return err
}
vc.vr.insertLog(LogCopyStart, fmt.Sprintf("Copy phase started for %d table(s)", len(plan.TargetTables)))
Expand All @@ -265,7 +265,7 @@ func (vc *vcopier) initTablesForCopy(ctx context.Context) error {
}
}
} else {
if err := vc.vr.setState(binlogdatapb.VReplicationWorkflowState_Stopped, "There is nothing to replicate"); err != nil {
if err := vc.vr.setState(binlogdatapb.VReplicationWorkflowState_Stopped, nil, "There is nothing to replicate"); err != nil {
return err
}
}
Expand Down
15 changes: 10 additions & 5 deletions go/vt/vttablet/tabletmanager/vreplication/vplayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func (vp *vplayer) play(ctx context.Context) error {
if !vp.stopPos.IsZero() && vp.startPos.AtLeast(vp.stopPos) {
log.Infof("Stop position %v already reached: %v", vp.startPos, vp.stopPos)
if vp.saveStop {
return vp.vr.setState(binlogdatapb.VReplicationWorkflowState_Stopped, fmt.Sprintf("Stop position %v already reached: %v", vp.startPos, vp.stopPos))
return vp.vr.setState(binlogdatapb.VReplicationWorkflowState_Stopped, nil, fmt.Sprintf("Stop position %v already reached: %v", vp.startPos, vp.stopPos))
}
return nil
}
Expand Down Expand Up @@ -265,7 +265,7 @@ func (vp *vplayer) fetchAndApply(ctx context.Context) (err error) {
log.Errorf("======= QQQ applying events. err=%v", err)
err = errors.Join(err, vp.applyEvents(ctx, relay, parallelPool))
log.Errorf("======= QQQ about to drain. err=%v", err)
err = errors.Join(err, parallelPool.drain(context.Background()))
err = errors.Join(err, parallelPool.drain(ctx))
log.Errorf("======= QQQ drain complete. err=%v", err)
applyErr <- err
countCommits := 0
Expand Down Expand Up @@ -316,7 +316,12 @@ func (vp *vplayer) fetchAndApply(ctx context.Context) (err error) {
}

// updatePos should get called at a minimum of vreplicationMinimumHeartbeatUpdateInterval.
func (vp *vplayer) updatePos(ctx context.Context, ts int64, queryFunc func(ctx context.Context, sql string) (*sqltypes.Result, error)) (posReached bool, err error) {
func (vp *vplayer) updatePos(
ctx context.Context,
ts int64,
queryFunc func(ctx context.Context, sql string) (*sqltypes.Result, error),
dbClient *vdbClient,
) (posReached bool, err error) {
vp.posMu.Lock()
defer vp.posMu.Unlock()

Expand All @@ -338,7 +343,7 @@ func (vp *vplayer) updatePos(ctx context.Context, ts int64, queryFunc func(ctx c
if posReached {
log.Infof("Stopped at position: %v", vp.stopPos)
if vp.saveStop {
if err := vp.vr.setState(binlogdatapb.VReplicationWorkflowState_Stopped, fmt.Sprintf("Stopped at position %v", vp.stopPos)); err != nil {
if err := vp.vr.setState(binlogdatapb.VReplicationWorkflowState_Stopped, dbClient, fmt.Sprintf("Stopped at position %v", vp.stopPos)); err != nil {
return false, err
}
}
Expand Down Expand Up @@ -467,7 +472,7 @@ func (vp *vplayer) applyEvents(ctx context.Context, relay *relayLog, parallelPoo
// In both cases, now > timeLastSaved. If so, the GTID of the last unsavedEvent
// must be saved.
if time.Since(vp.timeLastSaved) >= idleTimeout && vp.unsavedEvent.Load() != nil {
posReached, err := vp.updatePos(ctx, vp.unsavedEvent.Load().Timestamp, vp.query)
posReached, err := vp.updatePos(ctx, vp.unsavedEvent.Load().Timestamp, vp.query, nil)
if err != nil {
return err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ func newParallelWorkersPool(size int, dbClientGen dbClientGenerator, vp *vplayer
}
return nil, w.dbClient.AddQueryToTrxBatch(sql) // Should become part of the trx batch
}
w.dbClient.maxBatchSize = vp.vr.dbClient.maxBatchSize
} else {
w.queryFunc = func(ctx context.Context, sql string) (*sqltypes.Result, error) {
return w.dbClient.ExecuteWithRetry(ctx, sql)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (w *parallelWorker) applyQueuedStmtEvent(ctx context.Context, event *binlog

// updatePos should get called at a minimum of vreplicationMinimumHeartbeatUpdateInterval.
func (w *parallelWorker) updatePos(ctx context.Context, ts int64) (posReached bool, err error) {
return w.vp.updatePos(ctx, ts, w.queryFunc)
return w.vp.updatePos(ctx, ts, w.queryFunc, w.dbClient)
}

// updateFKCheck updates the @@session.foreign_key_checks variable based on the binlog row event flags.
Expand Down Expand Up @@ -453,7 +453,7 @@ func (w *parallelWorker) applyApplicableQueuedEvent(ctx context.Context, event *
if _, err := w.updatePos(ctx, event.Timestamp); err != nil {
return err
}
if err := w.vp.vr.setState(binlogdatapb.VReplicationWorkflowState_Stopped, fmt.Sprintf("Stopped at DDL %s", event.Statement)); err != nil {
if err := w.vp.vr.setState(binlogdatapb.VReplicationWorkflowState_Stopped, w.dbClient, fmt.Sprintf("Stopped at DDL %s", event.Statement)); err != nil {
return err
}
if err := w.dbClient.Commit(); err != nil {
Expand Down Expand Up @@ -517,7 +517,7 @@ func (w *parallelWorker) applyApplicableQueuedEvent(ctx context.Context, event *
switch {
case found && notFound:
// Some were found and some were not found. We can't handle this.
if err := w.vp.vr.setState(binlogdatapb.VReplicationWorkflowState_Stopped, "unable to handle journal event: tables were partially matched"); err != nil {
if err := w.vp.vr.setState(binlogdatapb.VReplicationWorkflowState_Stopped, w.dbClient, "unable to handle journal event: tables were partially matched"); err != nil {
return err
}
return io.EOF
Expand All @@ -529,7 +529,7 @@ func (w *parallelWorker) applyApplicableQueuedEvent(ctx context.Context, event *
}
log.Infof("Binlog event registering journal event %+v", event.Journal)
if err := w.vp.vr.vre.registerJournal(event.Journal, w.vp.vr.id); err != nil {
if err := w.vp.vr.setState(binlogdatapb.VReplicationWorkflowState_Stopped, err.Error()); err != nil {
if err := w.vp.vr.setState(binlogdatapb.VReplicationWorkflowState_Stopped, w.dbClient, err.Error()); err != nil {
return err
}
return io.EOF
Expand Down
17 changes: 10 additions & 7 deletions go/vt/vttablet/tabletmanager/vreplication/vreplicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,9 +334,9 @@ func (vr *vreplicator) replicate(ctx context.Context) error {
return err
}
if vr.source.StopAfterCopy {
return vr.setState(binlogdatapb.VReplicationWorkflowState_Stopped, "Stopped after copy.")
return vr.setState(binlogdatapb.VReplicationWorkflowState_Stopped, nil, "Stopped after copy.")
}
if err := vr.setState(binlogdatapb.VReplicationWorkflowState_Running, ""); err != nil {
if err := vr.setState(binlogdatapb.VReplicationWorkflowState_Running, nil, ""); err != nil {
vr.stats.ErrorCounts.Add([]string{"Replicate"}, 1)
return err
}
Expand Down Expand Up @@ -502,28 +502,31 @@ func (vr *vreplicator) insertLog(typ, message string) {
insertLog(vr.dbClient, typ, vr.id, vr.state.String(), message)
}

func (vr *vreplicator) setState(state binlogdatapb.VReplicationWorkflowState, message string) error {
func (vr *vreplicator) setState(state binlogdatapb.VReplicationWorkflowState, dbClient *vdbClient, message string) error {
if message != "" {
vr.stats.History.Add(&binlogplayer.StatsHistoryRecord{
Time: time.Now(),
Message: message,
})
}
if dbClient == nil {
dbClient = vr.dbClient
}
vr.stats.State.Store(state.String())
query := fmt.Sprintf("update _vt.vreplication set state='%v', message=%v where id=%v", state, encodeString(binlogplayer.MessageTruncate(message)), vr.id)
// If we're batching a transaction, then include the state update
// in the current transaction batch.
if vr.dbClient.InTransaction && vr.dbClient.maxBatchSize > 0 {
vr.dbClient.AddQueryToTrxBatch(query)
if dbClient.InTransaction && dbClient.maxBatchSize > 0 {
dbClient.AddQueryToTrxBatch(query)
} else { // Otherwise, send it down the wire
if _, err := vr.dbClient.ExecuteFetch(query, 1); err != nil {
if _, err := dbClient.ExecuteFetch(query, 1); err != nil {
return fmt.Errorf("could not set state: %v: %v", query, err)
}
}
if state == vr.state {
return nil
}
insertLog(vr.dbClient, LogStateChange, vr.id, state.String(), message)
insertLog(dbClient, LogStateChange, vr.id, state.String(), message)
vr.state = state

return nil
Expand Down

0 comments on commit b295b89

Please sign in to comment.