From b99e150a115f599668b0dbce3557f886ff7eae37 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Sat, 9 Mar 2024 20:23:51 -0500 Subject: [PATCH] VReplication: improve reliability of log management (#15374) Signed-off-by: Matt Lord --- go/textutil/strings.go | 31 ++++++ go/textutil/strings_test.go | 77 +++++++++++++++ go/vt/binlog/binlogplayer/binlog_player.go | 8 ++ go/vt/binlog/binlogplayer/dbclient.go | 9 +- go/vt/binlog/binlogplayer/mock_dbclient.go | 14 +++ go/vt/vtctl/workflow/server.go | 3 +- .../tabletmanager/vreplication/engine.go | 12 +-- .../tabletmanager/vreplication/utils.go | 32 +++--- .../tabletmanager/vreplication/utils_test.go | 99 +++++++++++++++++++ .../tabletmanager/vreplication/vcopier.go | 16 +-- .../vreplication/vplayer_flaky_test.go | 3 +- .../tabletmanager/vreplication/vreplicator.go | 28 ++---- 12 files changed, 274 insertions(+), 58 deletions(-) create mode 100644 go/vt/vttablet/tabletmanager/vreplication/utils_test.go diff --git a/go/textutil/strings.go b/go/textutil/strings.go index ac35541f52f..616366f0083 100644 --- a/go/textutil/strings.go +++ b/go/textutil/strings.go @@ -17,6 +17,7 @@ limitations under the License. package textutil import ( + "fmt" "net/url" "regexp" "strings" @@ -28,6 +29,13 @@ import ( topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) +type TruncationLocation int + +const ( + TruncationLocationMiddle TruncationLocation = iota + TruncationLocationEnd +) + var ( delimitedListRegexp = regexp.MustCompile(`[ ,;]+`) SimulatedNullString = sqltypes.NULL.String() @@ -133,3 +141,26 @@ func Title(s string) string { }, s) } + +// TruncateText truncates the provided text, if needed, to the specified maximum +// length using the provided truncation indicator in place of the truncated text +// in the specified location of the original string. +func TruncateText(text string, limit int, location TruncationLocation, indicator string) (string, error) { + ol := len(text) + if ol <= limit { + return text, nil + } + if len(indicator)+2 >= limit { + return "", fmt.Errorf("the truncation indicator is too long for the provided text") + } + switch location { + case TruncationLocationMiddle: + prefix := (limit / 2) - len(indicator) + suffix := (ol - (prefix + len(indicator))) + 1 + return fmt.Sprintf("%s%s%s", text[:prefix], indicator, text[suffix:]), nil + case TruncationLocationEnd: + return text[:limit-(len(indicator)+1)] + indicator, nil + default: + return "", fmt.Errorf("invalid truncation location: %d", location) + } +} diff --git a/go/textutil/strings_test.go b/go/textutil/strings_test.go index 2b43166831c..2ba9851b71c 100644 --- a/go/textutil/strings_test.go +++ b/go/textutil/strings_test.go @@ -17,9 +17,11 @@ limitations under the License. package textutil import ( + "strings" "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" topodatapb "vitess.io/vitess/go/vt/proto/topodata" @@ -208,3 +210,78 @@ func TestTitle(t *testing.T) { }) } } + +func TestTruncateText(t *testing.T) { + defaultLocation := TruncationLocationMiddle + defaultMaxLen := 100 + defaultTruncationIndicator := "..." + + tests := []struct { + name string + text string + maxLen int + location TruncationLocation + truncationIndicator string + want string + wantErr string + }{ + { + name: "no truncation", + text: "hello world", + maxLen: defaultMaxLen, + location: defaultLocation, + want: "hello world", + }, + { + name: "no truncation - exact", + text: strings.Repeat("a", defaultMaxLen), + maxLen: defaultMaxLen, + location: defaultLocation, + want: strings.Repeat("a", defaultMaxLen), + }, + { + name: "barely too long - mid", + text: strings.Repeat("a", defaultMaxLen+1), + truncationIndicator: defaultTruncationIndicator, + maxLen: defaultMaxLen, + location: defaultLocation, + want: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa...aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", + }, + { + name: "barely too long - end", + text: strings.Repeat("a", defaultMaxLen+1), + truncationIndicator: defaultTruncationIndicator, + maxLen: defaultMaxLen, + location: TruncationLocationEnd, + want: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa...", + }, + { + name: "too small", + text: strings.Repeat("a", defaultMaxLen), + truncationIndicator: defaultTruncationIndicator, + maxLen: 4, + location: defaultLocation, + wantErr: "the truncation indicator is too long for the provided text", + }, + { + name: "bad location", + text: strings.Repeat("a", defaultMaxLen+1), + truncationIndicator: defaultTruncationIndicator, + maxLen: defaultMaxLen, + location: 100, + wantErr: "invalid truncation location: 100", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + val, err := TruncateText(tt.text, tt.maxLen, tt.location, tt.truncationIndicator) + if tt.wantErr != "" { + require.EqualError(t, err, tt.wantErr) + } else { + require.NoError(t, err) + require.Equal(t, tt.want, val) + require.LessOrEqual(t, len(val), tt.maxLen) + } + }) + } +} diff --git a/go/vt/binlog/binlogplayer/binlog_player.go b/go/vt/binlog/binlogplayer/binlog_player.go index 6bdc2d70d2d..9a3b3f9c59f 100644 --- a/go/vt/binlog/binlogplayer/binlog_player.go +++ b/go/vt/binlog/binlogplayer/binlog_player.go @@ -43,8 +43,10 @@ import ( "vitess.io/vitess/go/protoutil" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/stats" + "vitess.io/vitess/go/textutil" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/servenv" + "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/throttler" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" @@ -65,8 +67,14 @@ var ( BlplTransaction = "Transaction" // BlplBatchTransaction is the key for the stats map. BlplBatchTransaction = "BatchTransaction" + + // Truncate values in the middle to preserve the end of the message which + // typically contains the error text. + TruncationLocation = textutil.TruncationLocationMiddle ) +var TruncationIndicator = fmt.Sprintf(" ... %s ... ", sqlparser.TruncationText) + // Stats is the internal stats of a player. It is a different // structure that is passed in so stats can be collected over the life // of multiple individual players. diff --git a/go/vt/binlog/binlogplayer/dbclient.go b/go/vt/binlog/binlogplayer/dbclient.go index bc96e690b76..61789f345c7 100644 --- a/go/vt/binlog/binlogplayer/dbclient.go +++ b/go/vt/binlog/binlogplayer/dbclient.go @@ -25,6 +25,7 @@ import ( "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/mysql/sqlerror" "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/textutil" "vitess.io/vitess/go/vt/dbconfigs" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/sqlparser" @@ -129,10 +130,14 @@ func LogError(msg string, err error) { // LimitString truncates string to specified size func LimitString(s string, limit int) string { - if len(s) > limit { + ts, err := textutil.TruncateText(s, limit, TruncationLocation, TruncationIndicator) + if err != nil { // Fallback to simple truncation + if len(s) <= limit { + return s + } return s[:limit] } - return s + return ts } func (dc *dbClientImpl) ExecuteFetch(query string, maxrows int) (*sqltypes.Result, error) { diff --git a/go/vt/binlog/binlogplayer/mock_dbclient.go b/go/vt/binlog/binlogplayer/mock_dbclient.go index abc170ed493..e89c630b4d8 100644 --- a/go/vt/binlog/binlogplayer/mock_dbclient.go +++ b/go/vt/binlog/binlogplayer/mock_dbclient.go @@ -244,3 +244,17 @@ func (dc *MockDBClient) ExecuteFetchMulti(query string, maxrows int) ([]*sqltype } return results, nil } + +// AddInvariant can be used to customize the behavior of the mock client. +func (dc *MockDBClient) AddInvariant(query string, result *sqltypes.Result) { + dc.expectMu.Lock() + defer dc.expectMu.Unlock() + dc.invariants[query] = result +} + +// RemoveInvariant can be used to customize the behavior of the mock client. +func (dc *MockDBClient) RemoveInvariant(query string) { + dc.expectMu.Lock() + defer dc.expectMu.Unlock() + delete(dc.invariants, query) +} diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index fc215b84c22..41873a5533c 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -923,7 +923,8 @@ ORDER BY if stream.Id > streamLog.StreamId { log.Warningf("Found stream log for nonexistent stream: %+v", streamLog) - break + // This can happen on manual/failed workflow cleanup so keep going. + continue } // stream.Id == streamLog.StreamId diff --git a/go/vt/vttablet/tabletmanager/vreplication/engine.go b/go/vt/vttablet/tabletmanager/vreplication/engine.go index 2b2937056ac..5a31ff62be7 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/engine.go +++ b/go/vt/vttablet/tabletmanager/vreplication/engine.go @@ -433,9 +433,7 @@ func (vre *Engine) exec(query string, runAsAdmin bool) (*sqltypes.Result, error) return nil, err } vre.controllers[id] = ct - if err := insertLogWithParams(vdbc, LogStreamCreate, id, params); err != nil { - return nil, err - } + insertLogWithParams(vdbc, LogStreamCreate, id, params) } return qr, nil case updateQuery: @@ -475,9 +473,7 @@ func (vre *Engine) exec(query string, runAsAdmin bool) (*sqltypes.Result, error) return nil, err } vre.controllers[id] = ct - if err := insertLog(vdbc, LogStateChange, id, params["state"], ""); err != nil { - return nil, err - } + insertLog(vdbc, LogStateChange, id, params["state"], "") } return qr, nil case deleteQuery: @@ -495,9 +491,7 @@ func (vre *Engine) exec(query string, runAsAdmin bool) (*sqltypes.Result, error) ct.Stop() delete(vre.controllers, id) } - if err := insertLogWithParams(vdbc, LogStreamDelete, id, nil); err != nil { - return nil, err - } + insertLogWithParams(vdbc, LogStreamDelete, id, nil) } if err := dbClient.Begin(); err != nil { return nil, err diff --git a/go/vt/vttablet/tabletmanager/vreplication/utils.go b/go/vt/vttablet/tabletmanager/vreplication/utils.go index 21c3a61c9f1..2b80bfb62a2 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/utils.go +++ b/go/vt/vttablet/tabletmanager/vreplication/utils.go @@ -24,6 +24,8 @@ import ( "vitess.io/vitess/go/constants/sidecar" "vitess.io/vitess/go/mysql/sqlerror" "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/textutil" + "vitess.io/vitess/go/vt/binlog/binlogplayer" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/vterrors" @@ -33,6 +35,8 @@ import ( const ( vreplicationLogTableName = "vreplication_log" + // This comes from the fact that the message column in the vreplication_log table is of type TEXT. + maxVReplicationLogMessageLen = 65535 ) const ( @@ -82,46 +86,50 @@ func getLastLog(dbClient *vdbClient, vreplID int32) (id int64, typ, state, messa return id, typ, state, message, nil } -func insertLog(dbClient *vdbClient, typ string, vreplID int32, state, message string) error { +func insertLog(dbClient *vdbClient, typ string, vreplID int32, state, message string) { // getLastLog returns the last log for a stream. During insertion, if the type/state/message match we do not insert // a new log but increment the count. This prevents spamming of the log table in case the same message is logged continuously. id, _, lastLogState, lastLogMessage, err := getLastLog(dbClient, vreplID) if err != nil { - return err + log.Errorf("Could not insert vreplication_log record because we failed to get the last log record: %v", err) + return } if typ == LogStateChange && state == lastLogState { // handles case where current state is Running, controller restarts after an error and initializes the state Running - return nil + return } var query string if id > 0 && message == lastLogMessage { query = fmt.Sprintf("update %s.vreplication_log set count = count + 1 where id = %d", sidecar.GetIdentifier(), id) } else { buf := sqlparser.NewTrackedBuffer(nil) + if len(message) > maxVReplicationLogMessageLen { + message, err = textutil.TruncateText(message, maxVReplicationLogMessageLen, binlogplayer.TruncationLocation, binlogplayer.TruncationIndicator) + if err != nil { + log.Errorf("Could not insert vreplication_log record because we failed to truncate the message: %v", err) + return + } + } buf.Myprintf("insert into %s.vreplication_log(vrepl_id, type, state, message) values(%s, %s, %s, %s)", sidecar.GetIdentifier(), strconv.Itoa(int(vreplID)), encodeString(typ), encodeString(state), encodeString(message)) query = buf.ParsedQuery().Query } if _, err = dbClient.ExecuteFetch(query, 10000); err != nil { - return fmt.Errorf("could not insert into log table: %v: %v", query, err) + log.Errorf("Could not insert into vreplication_log table: %v: %v", query, err) } - return nil } -// insertLogWithParams is called when a stream is created. The attributes of the stream are stored as a json string -func insertLogWithParams(dbClient *vdbClient, action string, vreplID int32, params map[string]string) error { +// insertLogWithParams is called when a stream is created. The attributes of the stream are stored as a json string. +func insertLogWithParams(dbClient *vdbClient, action string, vreplID int32, params map[string]string) { var message string if params != nil { obj, _ := json.Marshal(params) message = string(obj) } - if err := insertLog(dbClient, action, vreplID, params["state"], message); err != nil { - return err - } - return nil + insertLog(dbClient, action, vreplID, params["state"], message) } -// isUnrecoverableError returns true if vreplication cannot recover from the given error and should completely terminate +// isUnrecoverableError returns true if vreplication cannot recover from the given error and should completely terminate. func isUnrecoverableError(err error) bool { if err == nil { return false diff --git a/go/vt/vttablet/tabletmanager/vreplication/utils_test.go b/go/vt/vttablet/tabletmanager/vreplication/utils_test.go new file mode 100644 index 00000000000..bfe79036f3c --- /dev/null +++ b/go/vt/vttablet/tabletmanager/vreplication/utils_test.go @@ -0,0 +1,99 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package vreplication + +import ( + "fmt" + "strings" + "testing" + + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/textutil" + "vitess.io/vitess/go/vt/binlog/binlogplayer" + + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" +) + +func TestInsertLogTruncation(t *testing.T) { + dbClient := binlogplayer.NewMockDBClient(t) + defer dbClient.Close() + dbClient.RemoveInvariant("insert into _vt.vreplication_log") // Otherwise the insert will be ignored + stats := binlogplayer.NewStats() + defer stats.Stop() + vdbClient := newVDBClient(dbClient, stats) + defer vdbClient.Close() + vrID := int32(1) + typ := "Testing" + state := binlogdatapb.VReplicationWorkflowState_Error.String() + + insertStmtf := "insert into _vt.vreplication_log(vrepl_id, type, state, message) values(%d, '%s', '%s', %s)" + + tests := []struct { + message string + expectTruncation bool + }{ + { + message: "Simple message that's not truncated", + }, + { + message: "Simple message that needs to be truncated " + strings.Repeat("a", 80000) + " cuz it's long", + expectTruncation: true, + }, + { + message: "Simple message that doesn't need to be truncated " + strings.Repeat("b", 64000) + " cuz it's not quite too long", + }, + { + message: "Message that is just barely short enough " + strings.Repeat("c", maxVReplicationLogMessageLen-(len("Message that is just barely short enough ")+len(" so it doesn't get truncated"))) + " so it doesn't get truncated", + }, + { + message: "Message that is just barely too long " + strings.Repeat("d", maxVReplicationLogMessageLen-(len("Message that is just barely too long ")+len(" so it gets truncated"))+1) + " so it gets truncated", + expectTruncation: true, + }, + { + message: "Super long message brosef wut r ya doin " + strings.Repeat("e", 60000) + strings.Repeat("f", 60000) + " so maybe don't do that to yourself and your friends", + expectTruncation: true, + }, + { + message: "Super duper long message brosef wut r ya doin " + strings.Repeat("g", 120602) + strings.Repeat("h", 120001) + " so maybe really don't do that to yourself and your friends", + expectTruncation: true, + }, + } + for _, tc := range tests { + t.Run("insertLog", func(t *testing.T) { + var ( + messageOut string + err error + ) + if tc.expectTruncation { + messageOut, err = textutil.TruncateText(tc.message, maxVReplicationLogMessageLen, binlogplayer.TruncationLocation, binlogplayer.TruncationIndicator) + require.NoError(t, err) + require.True(t, strings.HasPrefix(messageOut, tc.message[:1024])) // Confirm we still have the same beginning + require.True(t, strings.HasSuffix(messageOut, tc.message[len(tc.message)-1024:])) // Confirm we still have the same end + require.True(t, strings.Contains(messageOut, binlogplayer.TruncationIndicator)) // Confirm we have the truncation text + t.Logf("Original message length: %d, truncated message length: %d", len(tc.message), len(messageOut)) + } else { + messageOut = tc.message + } + require.LessOrEqual(t, len(messageOut), maxVReplicationLogMessageLen) + dbClient.ExpectRequest(fmt.Sprintf(insertStmtf, vrID, typ, state, encodeString(messageOut)), &sqltypes.Result{}, nil) + insertLog(vdbClient, typ, vrID, state, tc.message) + dbClient.Wait() + }) + } +} diff --git a/go/vt/vttablet/tabletmanager/vreplication/vcopier.go b/go/vt/vttablet/tabletmanager/vreplication/vcopier.go index dfe51f71dbd..d92522bdc80 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vcopier.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vcopier.go @@ -246,10 +246,7 @@ func (vc *vcopier) initTablesForCopy(ctx context.Context) error { if err := vc.vr.setState(binlogdatapb.VReplicationWorkflowState_Copying, ""); err != nil { return err } - if err := vc.vr.insertLog(LogCopyStart, fmt.Sprintf("Copy phase started for %d table(s)", - len(plan.TargetTables))); err != nil { - return err - } + vc.vr.insertLog(LogCopyStart, fmt.Sprintf("Copy phase started for %d table(s)", len(plan.TargetTables))) if vc.vr.supportsDeferredSecondaryKeys() { settings, err := binlogplayer.ReadVRSettings(vc.vr.dbClient, vc.vr.id) @@ -257,20 +254,15 @@ func (vc *vcopier) initTablesForCopy(ctx context.Context) error { return err } if settings.DeferSecondaryKeys { - if err := vc.vr.insertLog(LogCopyStart, fmt.Sprintf("Copy phase temporarily dropping secondary keys for %d table(s)", - len(plan.TargetTables))); err != nil { - return err - } + vc.vr.insertLog(LogCopyStart, fmt.Sprintf("Copy phase temporarily dropping secondary keys for %d table(s)", len(plan.TargetTables))) for _, tableName := range tableNames { if err := vc.vr.stashSecondaryKeys(ctx, tableName); err != nil { return err } } - if err := vc.vr.insertLog(LogCopyStart, + vc.vr.insertLog(LogCopyStart, fmt.Sprintf("Copy phase finished dropping secondary keys and saving post copy actions to restore them for %d table(s)", - len(plan.TargetTables))); err != nil { - return err - } + len(plan.TargetTables))) } } } else { diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go index d9b68d052c3..ac0691dc8bc 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go @@ -2799,8 +2799,7 @@ func TestVReplicationLogs(t *testing.T) { for _, want := range expected { t.Run("", func(t *testing.T) { - err = insertLog(vdbc, LogMessage, 1, binlogdatapb.VReplicationWorkflowState_Running.String(), "message1") - require.NoError(t, err) + insertLog(vdbc, LogMessage, 1, binlogdatapb.VReplicationWorkflowState_Running.String(), "message1") qr, err := env.Mysqld.FetchSuperQuery(context.Background(), query) require.NoError(t, err) require.Equal(t, want, fmt.Sprintf("%v", qr.Rows)) diff --git a/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go b/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go index daa642c53af..8a01cf7c8ed 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go @@ -288,9 +288,7 @@ func (vr *vreplicator) replicate(ctx context.Context) error { return err } if numTablesToCopy == 0 { - if err := vr.insertLog(LogCopyEnd, fmt.Sprintf("Copy phase completed at gtid %s", settings.StartPos)); err != nil { - return err - } + vr.insertLog(LogCopyEnd, fmt.Sprintf("Copy phase completed at gtid %s", settings.StartPos)) } } case settings.StartPos.IsZero(): @@ -452,7 +450,7 @@ func (vr *vreplicator) readSettings(ctx context.Context, dbClient *vdbClient) (s return settings, numTablesToCopy, nil } -func (vr *vreplicator) setMessage(message string) error { +func (vr *vreplicator) setMessage(message string) (err error) { message = binlogplayer.MessageTruncate(message) vr.stats.History.Add(&binlogplayer.StatsHistoryRecord{ Time: time.Now(), @@ -464,14 +462,12 @@ func (vr *vreplicator) setMessage(message string) error { if _, err := vr.dbClient.Execute(query); err != nil { return fmt.Errorf("could not set message: %v: %v", query, err) } - if err := insertLog(vr.dbClient, LogMessage, vr.id, vr.state.String(), message); err != nil { - return err - } + insertLog(vr.dbClient, LogMessage, vr.id, vr.state.String(), message) return nil } -func (vr *vreplicator) insertLog(typ, message string) error { - return insertLog(vr.dbClient, typ, vr.id, vr.state.String(), message) +func (vr *vreplicator) insertLog(typ, message string) { + insertLog(vr.dbClient, typ, vr.id, vr.state.String(), message) } func (vr *vreplicator) setState(state binlogdatapb.VReplicationWorkflowState, message string) error { @@ -489,9 +485,7 @@ func (vr *vreplicator) setState(state binlogdatapb.VReplicationWorkflowState, me if state == vr.state { return nil } - if err := insertLog(vr.dbClient, LogStateChange, vr.id, state.String(), message); err != nil { - return err - } + insertLog(vr.dbClient, LogStateChange, vr.id, state.String(), message) vr.state = state return nil @@ -815,10 +809,7 @@ func (vr *vreplicator) execPostCopyActions(ctx context.Context, tableName string return nil } - if err := vr.insertLog(LogCopyStart, fmt.Sprintf("Executing %d post copy action(s) for %s table", - len(qr.Rows), tableName)); err != nil { - return err - } + vr.insertLog(LogCopyStart, fmt.Sprintf("Executing %d post copy action(s) for %s table", len(qr.Rows), tableName)) // Save our connection ID so we can use it to easily KILL any // running SQL action we may perform later if needed. @@ -1039,10 +1030,7 @@ func (vr *vreplicator) execPostCopyActions(ctx context.Context, tableName string } } - if err := vr.insertLog(LogCopyStart, fmt.Sprintf("Completed all post copy actions for %s table", - tableName)); err != nil { - return err - } + vr.insertLog(LogCopyStart, fmt.Sprintf("Completed all post copy actions for %s table", tableName)) return nil }