Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Online DDL: detect vreplication errors via vreplication_log history #16925

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions go/test/endtoend/onlineddl/vrepl/onlineddl_vrepl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ var (
ALTER TABLE %s
DROP PRIMARY KEY,
DROP COLUMN vrepl_col`
alterTableFailedVreplicationStatement = `
ALTER TABLE %s
ADD UNIQUE KEY test_val_uidx (test_val)`
// We will run this query while throttling vreplication
alterTableThrottlingStatement = `
ALTER TABLE %s
Expand Down Expand Up @@ -456,6 +459,19 @@ func TestVreplSchemaChanges(t *testing.T) {
onlineddl.CheckMigrationArtifacts(t, &vtParams, shards, uuid, true)
// migration will fail again
})
t.Run("failed migration due to vreplication", func(t *testing.T) {
insertRows(t, 2)
uuid := testOnlineDDLStatement(t, alterTableFailedVreplicationStatement, "online", providedUUID, providedMigrationContext, "vtgate", "vrepl_col", "", false)
onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, normalMigrationWait, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusFailed)

rs := onlineddl.ReadMigrations(t, &vtParams, uuid)
require.NotNil(t, rs)
for _, row := range rs.Named().Rows {
message := row["message"].ToString()
assert.Contains(t, message, "vreplication: terminal error:", "migration row: %v", row)
}
})
t.Run("cancel all migrations: nothing to cancel", func(t *testing.T) {
// no migrations pending at this time
time.Sleep(10 * time.Second)
Expand Down
23 changes: 23 additions & 0 deletions go/vt/vttablet/onlineddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ import (
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication"
"vitess.io/vitess/go/vt/vttablet/tabletserver/connpool"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle"
Expand Down Expand Up @@ -3485,6 +3486,28 @@ func (e *Executor) readVReplStream(ctx context.Context, uuid string, okIfMissing
if err := prototext.Unmarshal([]byte(s.source), s.bls); err != nil {
return nil, err
}
{
// It's possible that an earlier error was overshadowed by a new non-error `message` values.
// Let's read _vt.vreplication_log to see whether there's any terminal errors in vreplication's history.
query, err := sqlparser.ParseAndBind(sqlReadVReplLogErrors,
sqltypes.Int32BindVariable(s.id),
sqltypes.StringBindVariable(vreplication.TerminalErrorIndicator),
)
if err != nil {
return nil, err
}
r, err := e.execQuery(ctx, query)
if err != nil {
return nil, err
}
// The query has LIMIT 1, ie returns at most one row
if row := r.Named().Row(); row != nil {
s.state = binlogdatapb.VReplicationWorkflowState_Error
if message := row.AsString("message", ""); message != "" {
s.message = "vreplication: " + message
}
}
}
return s, nil
}

Expand Down
14 changes: 14 additions & 0 deletions go/vt/vttablet/onlineddl/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,20 @@ const (
WHERE
workflow=%a
`
sqlReadVReplLogErrors = `SELECT
state,
message
FROM _vt.vreplication_log
WHERE
vrepl_id=%a
AND (
state='Error'
OR locate (concat(%a, ':'), message) = 1
)
ORDER BY
id DESC
LIMIT 1
`
sqlReadCountCopyState = `SELECT
count(*) as cnt
FROM
Expand Down
5 changes: 4 additions & 1 deletion go/vt/vttablet/tabletmanager/vreplication/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ const (
// give up and return an error message that the user
// can see and act upon if needed.
tabletPickerRetries = 5

// Prepended to the message to indicate that it is a terminal error.
TerminalErrorIndicator = "terminal error"
)

// controller is created by Engine. Members are initialized upfront.
Expand Down Expand Up @@ -305,7 +308,7 @@ func (ct *controller) runBlp(ctx context.Context) (err error) {
if (err != nil && vr.WorkflowSubType == int32(binlogdatapb.VReplicationWorkflowSubType_AtomicCopy)) ||
isUnrecoverableError(err) ||
!ct.lastWorkflowError.ShouldRetry() {

err = vterrors.Wrapf(err, TerminalErrorIndicator)
if errSetState := vr.setState(binlogdatapb.VReplicationWorkflowState_Error, err.Error()); errSetState != nil {
log.Errorf("INTERNAL: unable to setState() in controller: %v. Could not set error text to: %v.", errSetState, err)
return err // yes, err and not errSetState.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1782,7 +1782,7 @@ func TestPlayerDDL(t *testing.T) {
expectDBClientQueries(t, qh.Expect(
"alter table t1 add column val2 varchar(128)",
"/update _vt.vreplication set message='error applying event: Duplicate",
"/update _vt.vreplication set state='Error', message='error applying event: Duplicate",
"/update _vt.vreplication set state='Error', message='terminal error: error applying event: Duplicate",
))
cancel()

Expand Down
Loading