From 1283a0f0398d139587dc57f86abc11c5903dbb4f Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Thu, 10 Oct 2024 13:45:48 +0300 Subject: [PATCH 1/3] Online DDL: detect vreplication errors via vreplication_log history Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- .../onlineddl/vrepl/onlineddl_vrepl_test.go | 16 ++++++++++++++++ go/vt/vttablet/onlineddl/executor.go | 19 +++++++++++++++++++ go/vt/vttablet/onlineddl/schema.go | 14 ++++++++++++++ .../tabletmanager/vreplication/controller.go | 2 +- 4 files changed, 50 insertions(+), 1 deletion(-) diff --git a/go/test/endtoend/onlineddl/vrepl/onlineddl_vrepl_test.go b/go/test/endtoend/onlineddl/vrepl/onlineddl_vrepl_test.go index f7bab109c05..161b1566680 100644 --- a/go/test/endtoend/onlineddl/vrepl/onlineddl_vrepl_test.go +++ b/go/test/endtoend/onlineddl/vrepl/onlineddl_vrepl_test.go @@ -80,6 +80,9 @@ var ( ALTER TABLE %s DROP PRIMARY KEY, DROP COLUMN vrepl_col` + alterTableFailedVreplicationStatement = ` + ALTER TABLE %s + ADD UNIQUE KEY test_val_uidx (test_val)` // We will run this query while throttling vreplication alterTableThrottlingStatement = ` ALTER TABLE %s @@ -456,6 +459,19 @@ func TestVreplSchemaChanges(t *testing.T) { onlineddl.CheckMigrationArtifacts(t, &vtParams, shards, uuid, true) // migration will fail again }) + t.Run("failed migration due to vreplication", func(t *testing.T) { + insertRows(t, 2) + uuid := testOnlineDDLStatement(t, alterTableFailedVreplicationStatement, "online", providedUUID, providedMigrationContext, "vtgate", "vrepl_col", "", false) + onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, normalMigrationWait, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusFailed) + + rs := onlineddl.ReadMigrations(t, &vtParams, uuid) + require.NotNil(t, rs) + for _, row := range rs.Named().Rows { + message := row["message"].ToString() + assert.Contains(t, message, "vreplication: terminal error:", "migration row: %v", row) + } + }) t.Run("cancel all migrations: nothing to cancel", func(t *testing.T) { // no migrations pending at this time time.Sleep(10 * time.Second) diff --git a/go/vt/vttablet/onlineddl/executor.go b/go/vt/vttablet/onlineddl/executor.go index aad8417237e..7e27ebfcea7 100644 --- a/go/vt/vttablet/onlineddl/executor.go +++ b/go/vt/vttablet/onlineddl/executor.go @@ -3485,6 +3485,25 @@ func (e *Executor) readVReplStream(ctx context.Context, uuid string, okIfMissing if err := prototext.Unmarshal([]byte(s.source), s.bls); err != nil { return nil, err } + { + // It's possible that an earlier error was overshadowed by a new non-error `message` values. + // Let's read _vt.vreplication_log to see whether there's any terminal errors in vreplication's history. + query, err := sqlparser.ParseAndBind(sqlReadVReplLogErrors, sqltypes.Int32BindVariable(s.id)) + if err != nil { + return nil, err + } + r, err := e.execQuery(ctx, query) + if err != nil { + return nil, err + } + // The query has LIMIT 1, ie returns at most one row + if row := r.Named().Row(); row != nil { + s.state = binlogdatapb.VReplicationWorkflowState_Error + if message := row.AsString("message", ""); message != "" { + s.message = "vreplication: " + message + } + } + } return s, nil } diff --git a/go/vt/vttablet/onlineddl/schema.go b/go/vt/vttablet/onlineddl/schema.go index 9023639fd00..15b4d5b63e5 100644 --- a/go/vt/vttablet/onlineddl/schema.go +++ b/go/vt/vttablet/onlineddl/schema.go @@ -515,6 +515,20 @@ const ( WHERE workflow=%a ` + sqlReadVReplLogErrors = `SELECT + state, + message + FROM _vt.vreplication_log + WHERE + vrepl_id=%a + AND ( + state='Error' + OR locate ('terminal error:', message) = 1 + ) + ORDER BY + id DESC + LIMIT 1 + ` sqlReadCountCopyState = `SELECT count(*) as cnt FROM diff --git a/go/vt/vttablet/tabletmanager/vreplication/controller.go b/go/vt/vttablet/tabletmanager/vreplication/controller.go index 575300a685c..deed0f9f4ba 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/controller.go +++ b/go/vt/vttablet/tabletmanager/vreplication/controller.go @@ -305,7 +305,7 @@ func (ct *controller) runBlp(ctx context.Context) (err error) { if (err != nil && vr.WorkflowSubType == int32(binlogdatapb.VReplicationWorkflowSubType_AtomicCopy)) || isUnrecoverableError(err) || !ct.lastWorkflowError.ShouldRetry() { - + err = vterrors.Wrapf(err, "terminal error") if errSetState := vr.setState(binlogdatapb.VReplicationWorkflowState_Error, err.Error()); errSetState != nil { log.Errorf("INTERNAL: unable to setState() in controller: %v. Could not set error text to: %v.", errSetState, err) return err // yes, err and not errSetState. From 0b48df54fbabf2741d4586bd56dc57aa77122c71 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Thu, 10 Oct 2024 16:51:06 +0300 Subject: [PATCH 2/3] adapt test Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go index 420134ab7e3..ccf1ce9119a 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go @@ -1782,7 +1782,7 @@ func TestPlayerDDL(t *testing.T) { expectDBClientQueries(t, qh.Expect( "alter table t1 add column val2 varchar(128)", "/update _vt.vreplication set message='error applying event: Duplicate", - "/update _vt.vreplication set state='Error', message='error applying event: Duplicate", + "/update _vt.vreplication set state='Error', message='terminal error: error applying event: Duplicate", )) cancel() From 63ef1718310aed9e90087f83908bdd52f63f91ef Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Sun, 20 Oct 2024 10:31:20 +0300 Subject: [PATCH 3/3] parameterize 'terminal error' Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/vt/vttablet/onlineddl/executor.go | 6 +++++- go/vt/vttablet/onlineddl/schema.go | 2 +- go/vt/vttablet/tabletmanager/vreplication/controller.go | 5 ++++- 3 files changed, 10 insertions(+), 3 deletions(-) diff --git a/go/vt/vttablet/onlineddl/executor.go b/go/vt/vttablet/onlineddl/executor.go index 7e27ebfcea7..22dd9447bb9 100644 --- a/go/vt/vttablet/onlineddl/executor.go +++ b/go/vt/vttablet/onlineddl/executor.go @@ -60,6 +60,7 @@ import ( "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication" "vitess.io/vitess/go/vt/vttablet/tabletserver/connpool" "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle" @@ -3488,7 +3489,10 @@ func (e *Executor) readVReplStream(ctx context.Context, uuid string, okIfMissing { // It's possible that an earlier error was overshadowed by a new non-error `message` values. // Let's read _vt.vreplication_log to see whether there's any terminal errors in vreplication's history. - query, err := sqlparser.ParseAndBind(sqlReadVReplLogErrors, sqltypes.Int32BindVariable(s.id)) + query, err := sqlparser.ParseAndBind(sqlReadVReplLogErrors, + sqltypes.Int32BindVariable(s.id), + sqltypes.StringBindVariable(vreplication.TerminalErrorIndicator), + ) if err != nil { return nil, err } diff --git a/go/vt/vttablet/onlineddl/schema.go b/go/vt/vttablet/onlineddl/schema.go index 15b4d5b63e5..a30ab6b3ed9 100644 --- a/go/vt/vttablet/onlineddl/schema.go +++ b/go/vt/vttablet/onlineddl/schema.go @@ -523,7 +523,7 @@ const ( vrepl_id=%a AND ( state='Error' - OR locate ('terminal error:', message) = 1 + OR locate (concat(%a, ':'), message) = 1 ) ORDER BY id DESC diff --git a/go/vt/vttablet/tabletmanager/vreplication/controller.go b/go/vt/vttablet/tabletmanager/vreplication/controller.go index deed0f9f4ba..7067211ff10 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/controller.go +++ b/go/vt/vttablet/tabletmanager/vreplication/controller.go @@ -47,6 +47,9 @@ const ( // give up and return an error message that the user // can see and act upon if needed. tabletPickerRetries = 5 + + // Prepended to the message to indicate that it is a terminal error. + TerminalErrorIndicator = "terminal error" ) // controller is created by Engine. Members are initialized upfront. @@ -305,7 +308,7 @@ func (ct *controller) runBlp(ctx context.Context) (err error) { if (err != nil && vr.WorkflowSubType == int32(binlogdatapb.VReplicationWorkflowSubType_AtomicCopy)) || isUnrecoverableError(err) || !ct.lastWorkflowError.ShouldRetry() { - err = vterrors.Wrapf(err, "terminal error") + err = vterrors.Wrapf(err, TerminalErrorIndicator) if errSetState := vr.setState(binlogdatapb.VReplicationWorkflowState_Error, err.Error()); errSetState != nil { log.Errorf("INTERNAL: unable to setState() in controller: %v. Could not set error text to: %v.", errSetState, err) return err // yes, err and not errSetState.