diff --git a/go/test/endtoend/mysqlctl/mysqlctl_test.go b/go/test/endtoend/mysqlctl/mysqlctl_test.go index bdea4d3988c..6c3d65226e3 100644 --- a/go/test/endtoend/mysqlctl/mysqlctl_test.go +++ b/go/test/endtoend/mysqlctl/mysqlctl_test.go @@ -141,21 +141,21 @@ func initCluster(shardNames []string, totalTabletsRequired int) { func TestRestart(t *testing.T) { defer cluster.PanicHandler(t) err := primaryTablet.MysqlctlProcess.Stop() - require.Nil(t, err) + require.NoError(t, err) primaryTablet.MysqlctlProcess.CleanupFiles(primaryTablet.TabletUID) err = primaryTablet.MysqlctlProcess.Start() - require.Nil(t, err) + require.NoError(t, err) } func TestAutoDetect(t *testing.T) { defer cluster.PanicHandler(t) err := clusterInstance.Keyspaces[0].Shards[0].Vttablets[0].VttabletProcess.Setup() - require.Nil(t, err, "error should be nil") + require.NoError(t, err) err = clusterInstance.Keyspaces[0].Shards[0].Vttablets[1].VttabletProcess.Setup() - require.Nil(t, err, "error should be nil") + require.NoError(t, err) // Reparent tablets, which requires flavor detection err = clusterInstance.VtctldClientProcess.InitializeShard(keyspaceName, shardName, cell, primaryTablet.TabletUID) - require.Nil(t, err, "error should be nil") + require.NoError(t, err) } diff --git a/go/vt/mysqlctl/backup_test.go b/go/vt/mysqlctl/backup_test.go index ad7e0faab98..1fa3749ea8d 100644 --- a/go/vt/mysqlctl/backup_test.go +++ b/go/vt/mysqlctl/backup_test.go @@ -29,8 +29,10 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/test/utils" "vitess.io/vitess/go/mysql/replication" @@ -148,9 +150,8 @@ func TestFindFilesToBackupWithoutRedoLog(t *testing.T) { rocksdbDir := path.Join(dataDir, ".rocksdb") sdiOnlyDir := path.Join(dataDir, "sdi_dir") for _, s := range []string{innodbDataDir, innodbLogDir, dataDbDir, extraDir, outsideDbDir, rocksdbDir, sdiOnlyDir} { - if err := os.MkdirAll(s, os.ModePerm); err != nil { - t.Fatalf("failed to create directory %v: %v", s, err) - } + err := os.MkdirAll(s, os.ModePerm) + require.NoErrorf(t, err, "failed to create directory %v: %v", s, err) } innodbLogFile := "innodb_log_1" @@ -678,3 +679,50 @@ func (fbe *fakeBackupRestoreEnv) setStats(stats *backupstats.FakeStats) { fbe.restoreParams.Stats = nil fbe.stats = nil } + +func TestParseBackupName(t *testing.T) { + // backup name doesn't contain 3 parts + _, _, err := ParseBackupName("dir", "asd.saddsa") + assert.ErrorContains(t, err, "cannot backup name") + + // Invalid time + bt, al, err := ParseBackupName("dir", "2024-03-18.123.tablet_id") + assert.Nil(t, bt) + assert.Nil(t, al) + assert.NoError(t, err) + + // Valid case + bt, al, err = ParseBackupName("dir", "2024-03-18.180911.cell1-42") + assert.NotNil(t, *bt, time.Date(2024, 03, 18, 18, 9, 11, 0, time.UTC)) + assert.Equal(t, "cell1", al.Cell) + assert.Equal(t, uint32(42), al.Uid) + assert.NoError(t, err) +} + +func TestShouldRestore(t *testing.T) { + env := createFakeBackupRestoreEnv(t) + + b, err := ShouldRestore(env.ctx, env.restoreParams) + assert.False(t, b) + assert.Error(t, err) + + env.restoreParams.DeleteBeforeRestore = true + b, err = ShouldRestore(env.ctx, env.restoreParams) + assert.True(t, b) + assert.NoError(t, err) + env.restoreParams.DeleteBeforeRestore = false + + env.mysqld.FetchSuperQueryMap = map[string]*sqltypes.Result{ + "SHOW DATABASES": {Rows: [][]sqltypes.Value{{sqltypes.NewVarBinary("any_db")}}}, + } + b, err = ShouldRestore(env.ctx, env.restoreParams) + assert.NoError(t, err) + assert.True(t, b) + + env.mysqld.FetchSuperQueryMap = map[string]*sqltypes.Result{ + "SHOW DATABASES": {Rows: [][]sqltypes.Value{{sqltypes.NewVarBinary("test")}}}, + } + b, err = ShouldRestore(env.ctx, env.restoreParams) + assert.False(t, b) + assert.NoError(t, err) +} diff --git a/go/vt/mysqlctl/compression_test.go b/go/vt/mysqlctl/compression_test.go index 4215761dbe7..16fde00677c 100644 --- a/go/vt/mysqlctl/compression_test.go +++ b/go/vt/mysqlctl/compression_test.go @@ -19,14 +19,13 @@ package mysqlctl import ( "bytes" "context" - "errors" "fmt" "io" - "reflect" "strings" "testing" "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "vitess.io/vitess/go/vt/logutil" @@ -47,15 +46,8 @@ func TestGetExtensionFromEngine(t *testing.T) { for _, tt := range tests { t.Run(tt.engine, func(t *testing.T) { ext, err := getExtensionFromEngine(tt.engine) - // if err != tt.err { - if !errors.Is(err, tt.err) { - t.Errorf("got err: %v; expected: %v", err, tt.err) - } - // } - - if ext != tt.extension { - t.Errorf("got err: %v; expected: %v", ext, tt.extension) - } + assert.ErrorIs(t, err, tt.err) + assert.Equal(t, tt.extension, ext) }) } } @@ -69,33 +61,20 @@ func TestBuiltinCompressors(t *testing.T) { var compressed, decompressed bytes.Buffer reader := bytes.NewReader(data) compressor, err := newBuiltinCompressor(engine, &compressed, logger) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) + _, err = io.Copy(compressor, reader) - if err != nil { - t.Error(err) - return - } + require.NoError(t, err) + compressor.Close() decompressor, err := newBuiltinDecompressor(engine, &compressed, logger) - if err != nil { - t.Error(err) - return - } + require.NoError(t, err) + _, err = io.Copy(&decompressed, decompressor) - if err != nil { - t.Error(err) - return - } - decompressor.Close() - if len(data) != len(decompressed.Bytes()) { - t.Errorf("Different size of original (%d bytes) and uncompressed (%d bytes) data", len(data), len(decompressed.Bytes())) - } + require.NoError(t, err) - if !reflect.DeepEqual(data, decompressed.Bytes()) { - t.Error("decompressed content differs from the original") - } + decompressor.Close() + assert.Equal(t, data, decompressed.Bytes()) }) } } @@ -142,33 +121,20 @@ func TestExternalCompressors(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) defer cancel() compressor, err := newExternalCompressor(ctx, tt.compress, &compressed, logger) - if err != nil { - t.Error(err) - return - } + require.NoError(t, err) + _, err = io.Copy(compressor, reader) - if err != nil { - t.Error(err) - return - } + require.NoError(t, err) + compressor.Close() decompressor, err := newExternalDecompressor(ctx, tt.decompress, &compressed, logger) - if err != nil { - t.Error(err) - return - } + require.NoError(t, err) + _, err = io.Copy(&decompressed, decompressor) - if err != nil { - t.Error(err) - return - } + require.NoError(t, err) + decompressor.Close() - if len(data) != len(decompressed.Bytes()) { - t.Errorf("Different size of original (%d bytes) and uncompressed (%d bytes) data", len(data), len(decompressed.Bytes())) - } - if !reflect.DeepEqual(data, decompressed.Bytes()) { - t.Error("decompressed content differs from the original") - } + assert.Equal(t, data, decompressed.Bytes()) }) } @@ -190,19 +156,13 @@ func TestValidateExternalCmd(t *testing.T) { t.Run(fmt.Sprintf("Test #%d", i+1), func(t *testing.T) { CmdName := tt.cmdName path, err := validateExternalCmd(CmdName) - if tt.path != "" { - if !strings.HasSuffix(path, tt.path) { - t.Errorf("Expected path \"%s\" to include \"%s\"", path, tt.path) - } - } + + assert.Contains(t, path, tt.path) + if tt.errStr == "" { - if err != nil { - t.Errorf("Expected result \"%v\", got \"%v\"", "", err) - } + assert.NoError(t, err) } else { - if !strings.Contains(fmt.Sprintf("%v", err), tt.errStr) { - t.Errorf("Expected result \"%v\", got \"%v\"", tt.errStr, err) - } + assert.ErrorContains(t, err, tt.errStr) } }) } diff --git a/go/vt/mysqlctl/mycnf_test.go b/go/vt/mysqlctl/mycnf_test.go index 7b8c2b1ddf0..bb3d6611c86 100644 --- a/go/vt/mysqlctl/mycnf_test.go +++ b/go/vt/mysqlctl/mycnf_test.go @@ -19,11 +19,11 @@ package mysqlctl import ( "bytes" "os" - "strings" "sync" "testing" "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "vitess.io/vitess/go/mysql/collations" @@ -120,14 +120,11 @@ func NoTestMycnfHook(t *testing.T) { servenv.OnClose(mysqld.Close) err := mysqld.InitConfig(cnf) - if err != nil { - t.Errorf("err: %v", err) - } + require.NoError(t, err) + _, err = os.ReadFile(cnf.Path) - if err != nil { - t.Errorf("failed reading, err %v", err) - return - } + require.NoError(t, err) + mycnf := NewMycnf(uid, 0) mycnf.Path = cnf.Path mycnf, err = ReadMycnf(mycnf, 0) @@ -137,33 +134,17 @@ func NoTestMycnfHook(t *testing.T) { t.Logf("socket file %v", mycnf.SocketFile) } // Tablet UID should be 11111, which determines tablet/data dir. - if got, want := mycnf.DataDir, "/vt_0000011111/"; !strings.Contains(got, want) { - t.Errorf("mycnf.DataDir = %v, want *%v*", got, want) - } + assert.Contains(t, mycnf.DataDir, "/vt_0000011111/") + // MySQL server-id should be 22222, different from Tablet UID. - if got, want := mycnf.ServerID, uint32(22222); got != want { - t.Errorf("mycnf.ServerID = %v, want %v", got, want) - } + assert.Equal(t, uint32(22222), mycnf.ServerID) + // check that the env variables we set were passed correctly to the hook - if got, want := mycnf.lookup("KEYSPACE"), "test-messagedb"; got != want { - t.Errorf("Error passing env %v, got %v, want %v", "KEYSPACE", got, want) - } - if got, want := mycnf.lookup("SHARD"), "0"; got != want { - t.Errorf("Error passing env %v, got %v, want %v", "SHARD", got, want) - } - if got, want := mycnf.lookup("TABLET_TYPE"), "PRIMARY"; got != want { - t.Errorf("Error passing env %v, got %v, want %v", "TABLET_TYPE", got, want) - } - if got, want := mycnf.lookup("TABLET_ID"), "11111"; got != want { - t.Errorf("Error passing env %v, got %v, want %v", "TABLET_ID", got, want) - } - if got, want := mycnf.lookup("TABLET_DIR"), "/vt_0000011111"; !strings.Contains(got, want) { - t.Errorf("Error passing env %v, got %v, want %v", "TABLET_DIR", got, want) - } - if got, want := mycnf.lookup("MYSQL_PORT"), "15306"; got != want { - t.Errorf("Error passing env %v, got %v, want %v", "MYSQL_PORT", got, want) - } - if got := mycnf.lookup("MY_VAR"); got != "" { - t.Errorf("Unexpected env %v set to %v", "MY_VAR", got) - } + assert.Equal(t, "test-messagedb", mycnf.lookup("KEYSPACE")) + assert.Equal(t, "test-0", mycnf.lookup("SHARD")) + assert.Equal(t, "PRIMARY", mycnf.lookup("TABLET_TYPE")) + assert.Equal(t, "11111", mycnf.lookup("TABLET_ID")) + assert.Equal(t, "/vt_0000011111", mycnf.lookup("TABLET_DIR")) + assert.Equal(t, "15306", mycnf.lookup("MYSQL_PORT")) + assert.Equal(t, "", mycnf.lookup("MY_VAR")) } diff --git a/go/vt/mysqlctl/mysqld_test.go b/go/vt/mysqlctl/mysqld_test.go index b1e5e9a2916..cc31206aa0c 100644 --- a/go/vt/mysqlctl/mysqld_test.go +++ b/go/vt/mysqlctl/mysqld_test.go @@ -17,13 +17,19 @@ limitations under the License. package mysqlctl import ( + "context" "os" "strconv" + "strings" "testing" "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/mysql/fakesqldb" + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/vt/dbconfigs" ) type testcase struct { @@ -204,3 +210,100 @@ func TestCleanupLockfile(t *testing.T) { assert.Error(t, cleanupLockfile("mysql.sock", ts)) assert.FileExists(t, "mysql.sock.lock") } + +func TestRunMysqlUpgrade(t *testing.T) { + ver, err := GetVersionString() + require.NoError(t, err) + if strings.Contains(ver, "5.7") { + t.Skipf("Run the test only for 8.0") + } + + db := fakesqldb.New(t) + defer db.Close() + + params := db.ConnParams() + cp := *params + dbc := dbconfigs.NewTestDBConfigs(cp, cp, "fakesqldb") + + testMysqld := NewMysqld(dbc) + defer testMysqld.Close() + + ctx := context.Background() + err = testMysqld.RunMysqlUpgrade(ctx) + assert.NoError(t, err) + + // Should not fail for older versions + testMysqld.capabilities = newCapabilitySet(FlavorMySQL, ServerVersion{Major: 8, Minor: 0, Patch: 15}) + err = testMysqld.RunMysqlUpgrade(ctx) + assert.NoError(t, err) +} + +func TestGetDbaConnection(t *testing.T) { + db := fakesqldb.New(t) + defer db.Close() + + params := db.ConnParams() + cp := *params + dbc := dbconfigs.NewTestDBConfigs(cp, cp, "fakesqldb") + + testMysqld := NewMysqld(dbc) + defer testMysqld.Close() + + ctx := context.Background() + + conn, err := testMysqld.GetDbaConnection(ctx) + assert.NoError(t, err) + assert.NoError(t, conn.Ping()) + defer conn.Close() +} + +func TestGetVersionString(t *testing.T) { + db := fakesqldb.New(t) + defer db.Close() + + params := db.ConnParams() + cp := *params + dbc := dbconfigs.NewTestDBConfigs(cp, cp, "fakesqldb") + + testMysqld := NewMysqld(dbc) + defer testMysqld.Close() + + ctx := context.Background() + str, err := testMysqld.GetVersionString(ctx) + assert.NoError(t, err) + assert.NotEmpty(t, str) + + ver := "test_version" + db.AddQuery("SELECT 1", &sqltypes.Result{}) + db.AddQuery(versionSQLQuery, sqltypes.MakeTestResult(sqltypes.MakeTestFields("test_field", "varchar"), ver)) + + str, err = testMysqld.GetVersionString(ctx) + assert.Equal(t, ver, str) + assert.NoError(t, err) +} + +func TestGetVersionComment(t *testing.T) { + db := fakesqldb.New(t) + defer db.Close() + + params := db.ConnParams() + cp := *params + dbc := dbconfigs.NewTestDBConfigs(cp, cp, "fakesqldb") + + db.AddQuery("SELECT 1", &sqltypes.Result{}) + db.AddQuery("select @@global.version_comment", sqltypes.MakeTestResult(sqltypes.MakeTestFields("@@global.version_comment", "varchar"), "test_version1", "test_version2")) + + testMysqld := NewMysqld(dbc) + defer testMysqld.Close() + + ctx := context.Background() + _, err := testMysqld.GetVersionComment(ctx) + assert.ErrorContains(t, err, "unexpected result length") + + ver := "test_version" + db.AddQuery("select @@global.version_comment", sqltypes.MakeTestResult(sqltypes.MakeTestFields("@@global.version_comment", "varchar"), ver)) + + str, err := testMysqld.GetVersionComment(ctx) + assert.NoError(t, err) + assert.Equal(t, ver, str) +} diff --git a/go/vt/mysqlctl/permissions_test.go b/go/vt/mysqlctl/permissions_test.go new file mode 100644 index 00000000000..5a8954fac15 --- /dev/null +++ b/go/vt/mysqlctl/permissions_test.go @@ -0,0 +1,44 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package mysqlctl + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "vitess.io/vitess/go/mysql/fakesqldb" + "vitess.io/vitess/go/sqltypes" +) + +func TestGetPermissions(t *testing.T) { + db := fakesqldb.New(t) + defer db.Close() + + testMysqld := NewFakeMysqlDaemon(db) + defer testMysqld.Close() + + testMysqld.FetchSuperQueryMap = map[string]*sqltypes.Result{ + "SELECT * FROM mysql.user ORDER BY host, user": sqltypes.MakeTestResult(sqltypes.MakeTestFields("host|user", "varchar|varchar"), "test_host1|test_user1", "test_host2|test_user2"), + "SELECT * FROM mysql.db ORDER BY host, db, user": sqltypes.MakeTestResult(sqltypes.MakeTestFields("host|user|db", "varchar|varchar|varchar"), "test_host1|test_user1|test_db1", "test_host2|test_user2|test_db2"), + } + + per, err := GetPermissions(testMysqld) + assert.NoError(t, err) + assert.Len(t, per.DbPermissions, 2) + assert.Len(t, per.UserPermissions, 2) +} diff --git a/go/vt/mysqlctl/redo_log_test.go b/go/vt/mysqlctl/redo_log_test.go new file mode 100644 index 00000000000..ae2005bdc51 --- /dev/null +++ b/go/vt/mysqlctl/redo_log_test.go @@ -0,0 +1,52 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package mysqlctl + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + + "vitess.io/vitess/go/mysql/fakesqldb" + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/vt/dbconfigs" +) + +func TestProcessCanDisableRedoLog(t *testing.T) { + db := fakesqldb.New(t) + defer db.Close() + + params := db.ConnParams() + cp := *params + dbc := dbconfigs.NewTestDBConfigs(cp, cp, "fakesqldb") + + db.AddQuery("SELECT 1", &sqltypes.Result{}) + db.AddQuery("SELECT variable_value FROM performance_schema.global_status WHERE variable_name = 'innodb_redo_log_enabled'", sqltypes.MakeTestResult(sqltypes.MakeTestFields("field1", "varchar"), "val1")) + + testMysqld := NewMysqld(dbc) + defer testMysqld.Close() + + res, err := testMysqld.ProcessCanDisableRedoLog(context.Background()) + assert.NoError(t, err) + assert.True(t, res) + + db.AddQuery("SELECT variable_value FROM performance_schema.global_status WHERE variable_name = 'innodb_redo_log_enabled'", &sqltypes.Result{}) + res, err = testMysqld.ProcessCanDisableRedoLog(context.Background()) + assert.Error(t, err) + assert.False(t, res) +} diff --git a/go/vt/mysqlctl/reparent_test.go b/go/vt/mysqlctl/reparent_test.go new file mode 100644 index 00000000000..7d43ffe9d30 --- /dev/null +++ b/go/vt/mysqlctl/reparent_test.go @@ -0,0 +1,95 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package mysqlctl + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/mysql/fakesqldb" + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/vt/dbconfigs" + "vitess.io/vitess/go/vt/logutil" +) + +func TestPopulateReparentJournal(t *testing.T) { + input := `MySQL binlog position: filename 'vt-0476396352-bin.000005', position '310088991', GTID of the last change '145e508e-ae54-11e9-8ce6-46824dd1815e:1-3, + 1e51f8be-ae54-11e9-a7c6-4280a041109b:1-3, + 47b59de1-b368-11e9-b48b-624401d35560:1-152981, + 557def0a-b368-11e9-84ed-f6fffd91cc57:1-3, + 599ef589-ae55-11e9-9688-ca1f44501925:1-14857169, + b9ce485d-b36b-11e9-9b17-2a6e0a6011f4:1-371262' + MySQL slave binlog position: master host '10.128.0.43', purge list '145e508e-ae54-11e9-8ce6-46824dd1815e:1-3, 1e51f8be-ae54-11e9-a7c6-4280a041109b:1-3, 47b59de1-b368-11e9-b48b-624401d35560:1-152981, 557def0a-b368-11e9-84ed-f6fffd91cc57:1-3, 599ef589-ae55-11e9-9688-ca1f44501925:1-14857169, b9ce485d-b36b-11e9-9b17-2a6e0a6011f4:1-371262', channel name: '' + + 190809 00:15:44 [00] Streaming + 190809 00:15:44 [00] ...done + 190809 00:15:44 [00] Streaming + 190809 00:15:44 [00] ...done + xtrabackup: Transaction log of lsn (405344842034) to (406364859653) was copied. + 190809 00:16:14 completed OK!` + + pos, err := findReplicationPosition(input, "MySQL56", logutil.NewConsoleLogger()) + require.NoError(t, err) + + res := PopulateReparentJournal(1, "action", "primaryAlias", pos) + want := `INSERT INTO _vt.reparent_journal (time_created_ns, action_name, primary_alias, replication_position) VALUES (1, 'action', 'primaryAlias', 'MySQL56/145e508e-ae54-11e9-8ce6-46824dd1815e:1-3,1e51f8be-ae54-11e9-a7c6-4280a041109b:1-3,47b59de1-b368-11e9-b48b-624401d35560:1-152981,557def0a-b368-11e9-84ed-f6fffd91cc57:1-3,599ef589-ae55-11e9-9688-ca1f44501925:1-14857169,b9ce485d-b36b-11e9-9b17-2a6e0a6011f4:1-371262')` + assert.Equal(t, want, res) +} + +func TestWaitForReparentJournal(t *testing.T) { + db := fakesqldb.New(t) + defer db.Close() + + params := db.ConnParams() + cp := *params + dbc := dbconfigs.NewTestDBConfigs(cp, cp, "fakesqldb") + + db.AddQuery("SELECT 1", &sqltypes.Result{}) + db.AddQuery("SELECT action_name, primary_alias, replication_position FROM _vt.reparent_journal WHERE time_created_ns=5", sqltypes.MakeTestResult(sqltypes.MakeTestFields("test_field", "varchar"), "test_row")) + + testMysqld := NewMysqld(dbc) + defer testMysqld.Close() + + ctx := context.Background() + err := testMysqld.WaitForReparentJournal(ctx, 5) + assert.NoError(t, err) +} + +func TestPromote(t *testing.T) { + db := fakesqldb.New(t) + defer db.Close() + + params := db.ConnParams() + cp := *params + dbc := dbconfigs.NewTestDBConfigs(cp, cp, "fakesqldb") + + db.AddQuery("SELECT 1", &sqltypes.Result{}) + db.AddQuery("STOP SLAVE", &sqltypes.Result{}) + db.AddQuery("RESET SLAVE ALL", &sqltypes.Result{}) + db.AddQuery("FLUSH BINARY LOGS", &sqltypes.Result{}) + db.AddQuery("SELECT @@global.gtid_executed", sqltypes.MakeTestResult(sqltypes.MakeTestFields("test_field", "varchar"), "8bc65c84-3fe4-11ed-a912-257f0fcdd6c9:1-8,8bc65c84-3fe4-11ed-a912-257f0fcdd6c9:12-17")) + + testMysqld := NewMysqld(dbc) + defer testMysqld.Close() + + pos, err := testMysqld.Promote(map[string]string{}) + assert.NoError(t, err) + assert.Equal(t, "8bc65c84-3fe4-11ed-a912-257f0fcdd6c9:1-8:12-17", pos.String()) +} diff --git a/go/vt/mysqlctl/replication.go b/go/vt/mysqlctl/replication.go index af20bbef85f..e53caac593d 100644 --- a/go/vt/mysqlctl/replication.go +++ b/go/vt/mysqlctl/replication.go @@ -40,27 +40,27 @@ type ResetSuperReadOnlyFunc func() error // WaitForReplicationStart waits until the deadline for replication to start. // This validates the current primary is correct and can be connected to. -func WaitForReplicationStart(mysqld MysqlDaemon, replicaStartDeadline int) error { - var rowMap map[string]string +func WaitForReplicationStart(mysqld MysqlDaemon, replicaStartDeadline int) (err error) { + var replicaStatus replication.ReplicationStatus for replicaWait := 0; replicaWait < replicaStartDeadline; replicaWait++ { - status, err := mysqld.ReplicationStatus() + replicaStatus, err = mysqld.ReplicationStatus() if err != nil { return err } - if status.Running() { + if replicaStatus.Running() { return nil } time.Sleep(time.Second) } - - errorKeys := []string{"Last_Error", "Last_IO_Error", "Last_SQL_Error"} - errs := make([]string, 0, len(errorKeys)) - for _, key := range errorKeys { - if rowMap[key] != "" { - errs = append(errs, key+": "+rowMap[key]) - } + errs := make([]string, 0, 2) + if replicaStatus.LastSQLError != "" { + errs = append(errs, "Last_SQL_Error: "+replicaStatus.LastSQLError) } + if replicaStatus.LastIOError != "" { + errs = append(errs, "Last_IO_Error: "+replicaStatus.LastIOError) + } + if len(errs) != 0 { return errors.New(strings.Join(errs, ", ")) } diff --git a/go/vt/mysqlctl/replication_test.go b/go/vt/mysqlctl/replication_test.go index 1502ad4773e..e171379f668 100644 --- a/go/vt/mysqlctl/replication_test.go +++ b/go/vt/mysqlctl/replication_test.go @@ -17,13 +17,22 @@ limitations under the License. package mysqlctl import ( + "context" + "fmt" + "net" "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/mysql/fakesqldb" + "vitess.io/vitess/go/mysql/replication" + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/vt/dbconfigs" ) func testRedacted(t *testing.T, source, expected string) { - if r := redactPassword(source); r != expected { - t.Errorf("redactPassword bad result: %v\nWas expecting:%v", r, expected) - } + assert.Equal(t, expected, redactPassword(source)) } func TestRedactMasterPassword(t *testing.T) { @@ -80,3 +89,603 @@ func TestRedactPassword(t *testing.T) { PASSWORD = '****' `) } + +func TestWaitForReplicationStart(t *testing.T) { + db := fakesqldb.New(t) + fakemysqld := NewFakeMysqlDaemon(db) + + defer func() { + db.Close() + fakemysqld.Close() + }() + + err := WaitForReplicationStart(fakemysqld, 2) + assert.NoError(t, err) + + fakemysqld.ReplicationStatusError = fmt.Errorf("test error") + err = WaitForReplicationStart(fakemysqld, 2) + assert.ErrorContains(t, err, "test error") + + params := db.ConnParams() + cp := *params + dbc := dbconfigs.NewTestDBConfigs(cp, cp, "fakesqldb") + + testMysqld := NewMysqld(dbc) + defer testMysqld.Close() + + db.AddQuery("SELECT 1", &sqltypes.Result{}) + db.AddQuery("SHOW SLAVE STATUS", sqltypes.MakeTestResult(sqltypes.MakeTestFields("Last_SQL_Error|Last_IO_Error", "varchar|varchar"), "test sql error|test io error")) + + err = WaitForReplicationStart(testMysqld, 2) + assert.ErrorContains(t, err, "Last_SQL_Error: test sql error, Last_IO_Error: test io error") +} + +func TestGetMysqlPort(t *testing.T) { + db := fakesqldb.New(t) + defer db.Close() + + params := db.ConnParams() + cp := *params + dbc := dbconfigs.NewTestDBConfigs(cp, cp, "fakesqldb") + + db.AddQuery("SELECT 1", &sqltypes.Result{}) + db.AddQuery("SHOW VARIABLES LIKE 'port'", sqltypes.MakeTestResult(sqltypes.MakeTestFields("test_field|test_field2", "varchar|uint64"), "test_port|12")) + testMysqld := NewMysqld(dbc) + defer testMysqld.Close() + + res, err := testMysqld.GetMysqlPort() + assert.Equal(t, int32(12), res) + assert.NoError(t, err) + + db.AddQuery("SHOW VARIABLES LIKE 'port'", &sqltypes.Result{}) + res, err = testMysqld.GetMysqlPort() + assert.ErrorContains(t, err, "no port variable in mysql") + assert.Equal(t, int32(0), res) +} + +func TestGetServerID(t *testing.T) { + db := fakesqldb.New(t) + defer db.Close() + + params := db.ConnParams() + cp := *params + dbc := dbconfigs.NewTestDBConfigs(cp, cp, "fakesqldb") + + db.AddQuery("SELECT 1", &sqltypes.Result{}) + db.AddQuery("select @@global.server_id", sqltypes.MakeTestResult(sqltypes.MakeTestFields("test_field", "uint64"), "12")) + testMysqld := NewMysqld(dbc) + defer testMysqld.Close() + + ctx := context.Background() + res, err := testMysqld.GetServerID(ctx) + assert.Equal(t, uint32(12), res) + assert.NoError(t, err) + + db.AddQuery("select @@global.server_id", &sqltypes.Result{}) + res, err = testMysqld.GetServerID(ctx) + assert.ErrorContains(t, err, "no server_id in mysql") + assert.Equal(t, uint32(0), res) +} + +func TestGetServerUUID(t *testing.T) { + db := fakesqldb.New(t) + defer db.Close() + + params := db.ConnParams() + cp := *params + dbc := dbconfigs.NewTestDBConfigs(cp, cp, "fakesqldb") + + db.AddQuery("SELECT 1", &sqltypes.Result{}) + + testMysqld := NewMysqld(dbc) + defer testMysqld.Close() + + uuid := "test_uuid" + db.AddQuery("SELECT @@global.server_uuid", sqltypes.MakeTestResult(sqltypes.MakeTestFields("test_field", "varchar"), uuid)) + + ctx := context.Background() + res, err := testMysqld.GetServerUUID(ctx) + assert.Equal(t, uuid, res) + assert.NoError(t, err) + + db.AddQuery("SELECT @@global.server_uuid", &sqltypes.Result{}) + res, err = testMysqld.GetServerUUID(ctx) + assert.Error(t, err) + assert.Equal(t, "", res) +} + +func TestWaitSourcePos(t *testing.T) { + db := fakesqldb.New(t) + defer db.Close() + + params := db.ConnParams() + cp := *params + dbc := dbconfigs.NewTestDBConfigs(cp, cp, "fakesqldb") + + db.AddQuery("SELECT 1", &sqltypes.Result{}) + db.AddQuery("SELECT @@global.gtid_executed", sqltypes.MakeTestResult(sqltypes.MakeTestFields("test_field", "varchar"), "8bc65c84-3fe4-11ed-a912-257f0fcdd6c9:1-8,8bc65c84-3fe4-11ed-a912-257f0fcdd6c9:12-17")) + + testMysqld := NewMysqld(dbc) + defer testMysqld.Close() + + ctx := context.Background() + err := testMysqld.WaitSourcePos(ctx, replication.Position{GTIDSet: replication.Mysql56GTIDSet{}}) + assert.NoError(t, err) + + db.AddQuery("SELECT @@global.gtid_executed", sqltypes.MakeTestResult(sqltypes.MakeTestFields("test_field", "varchar"), "invalid_id")) + err = testMysqld.WaitSourcePos(ctx, replication.Position{GTIDSet: replication.Mysql56GTIDSet{}}) + assert.ErrorContains(t, err, "invalid MySQL 5.6 GTID set") +} + +func TestReplicationStatus(t *testing.T) { + db := fakesqldb.New(t) + defer db.Close() + + params := db.ConnParams() + cp := *params + dbc := dbconfigs.NewTestDBConfigs(cp, cp, "fakesqldb") + + db.AddQuery("SELECT 1", &sqltypes.Result{}) + db.AddQuery("SHOW SLAVE STATUS", sqltypes.MakeTestResult(sqltypes.MakeTestFields("test_field", "varchar"), "test_status")) + + testMysqld := NewMysqld(dbc) + defer testMysqld.Close() + + res, err := testMysqld.ReplicationStatus() + assert.NoError(t, err) + assert.True(t, res.ReplicationLagUnknown) + + db.AddQuery("SHOW SLAVE STATUS", &sqltypes.Result{}) + res, err = testMysqld.ReplicationStatus() + assert.Error(t, err) + assert.False(t, res.ReplicationLagUnknown) +} + +func TestPrimaryStatus(t *testing.T) { + db := fakesqldb.New(t) + defer db.Close() + + params := db.ConnParams() + cp := *params + dbc := dbconfigs.NewTestDBConfigs(cp, cp, "fakesqldb") + + db.AddQuery("SELECT 1", &sqltypes.Result{}) + db.AddQuery("SHOW MASTER STATUS", sqltypes.MakeTestResult(sqltypes.MakeTestFields("test_field", "varchar"), "test_status")) + + testMysqld := NewMysqld(dbc) + defer testMysqld.Close() + + ctx := context.Background() + res, err := testMysqld.PrimaryStatus(ctx) + assert.NoError(t, err) + assert.NotNil(t, res) + + db.AddQuery("SHOW MASTER STATUS", &sqltypes.Result{}) + _, err = testMysqld.PrimaryStatus(ctx) + assert.ErrorContains(t, err, "no master status") +} + +func TestGetGTIDPurged(t *testing.T) { + db := fakesqldb.New(t) + defer db.Close() + + params := db.ConnParams() + cp := *params + dbc := dbconfigs.NewTestDBConfigs(cp, cp, "fakesqldb") + + db.AddQuery("SELECT 1", &sqltypes.Result{}) + db.AddQuery("SELECT @@global.gtid_purged", sqltypes.MakeTestResult(sqltypes.MakeTestFields("test_field", "varchar"), "8bc65c84-3fe4-11ed-a912-257f0fcdd6c9:1-8,8bc65c84-3fe4-11ed-a912-257f0fcdd6c9:12-17")) + + testMysqld := NewMysqld(dbc) + defer testMysqld.Close() + + ctx := context.Background() + res, err := testMysqld.GetGTIDPurged(ctx) + assert.NoError(t, err) + assert.Equal(t, "8bc65c84-3fe4-11ed-a912-257f0fcdd6c9:1-8:12-17", res.String()) +} + +func TestPrimaryPosition(t *testing.T) { + db := fakesqldb.New(t) + defer db.Close() + + params := db.ConnParams() + cp := *params + dbc := dbconfigs.NewTestDBConfigs(cp, cp, "fakesqldb") + + db.AddQuery("SELECT 1", &sqltypes.Result{}) + db.AddQuery("SELECT @@global.gtid_executed", sqltypes.MakeTestResult(sqltypes.MakeTestFields("test_field", "varchar"), "8bc65c84-3fe4-11ed-a912-257f0fcdd6c9:1-8,8bc65c84-3fe4-11ed-a912-257f0fcdd6c9:12-17")) + + testMysqld := NewMysqld(dbc) + defer testMysqld.Close() + + res, err := testMysqld.PrimaryPosition() + assert.NoError(t, err) + assert.Equal(t, "8bc65c84-3fe4-11ed-a912-257f0fcdd6c9:1-8:12-17", res.String()) +} + +func TestSetReplicationPosition(t *testing.T) { + db := fakesqldb.New(t) + defer db.Close() + + params := db.ConnParams() + cp := *params + dbc := dbconfigs.NewTestDBConfigs(cp, cp, "fakesqldb") + + db.AddQuery("SELECT 1", &sqltypes.Result{}) + db.AddQuery("RESET MASTER", &sqltypes.Result{}) + + testMysqld := NewMysqld(dbc) + defer testMysqld.Close() + + ctx := context.Background() + + pos := replication.Position{GTIDSet: replication.Mysql56GTIDSet{}} + sid := replication.SID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15} + pos.GTIDSet = pos.GTIDSet.AddGTID(replication.Mysql56GTID{Server: sid, Sequence: 1}) + + err := testMysqld.SetReplicationPosition(ctx, pos) + assert.Error(t, err) + + // We expect this query to be executed + db.AddQuery("SET GLOBAL gtid_purged = '00010203-0405-0607-0809-0a0b0c0d0e0f:1'", &sqltypes.Result{}) + + err = testMysqld.SetReplicationPosition(ctx, pos) + assert.NoError(t, err) +} + +func TestSetReplicationSource(t *testing.T) { + db := fakesqldb.New(t) + defer db.Close() + + params := db.ConnParams() + cp := *params + dbc := dbconfigs.NewTestDBConfigs(cp, cp, "fakesqldb") + + db.AddQuery("SELECT 1", &sqltypes.Result{}) + db.AddQuery("RESET MASTER", &sqltypes.Result{}) + db.AddQuery("STOP SLAVE", &sqltypes.Result{}) + + testMysqld := NewMysqld(dbc) + defer testMysqld.Close() + + ctx := context.Background() + + // We expect query containing passed host and port to be executed + err := testMysqld.SetReplicationSource(ctx, "test_host", 2, true, true) + assert.ErrorContains(t, err, `MASTER_HOST = 'test_host'`) + assert.ErrorContains(t, err, `MASTER_PORT = 2`) + assert.ErrorContains(t, err, `CHANGE MASTER TO`) +} + +func TestResetReplication(t *testing.T) { + db := fakesqldb.New(t) + defer db.Close() + + params := db.ConnParams() + cp := *params + dbc := dbconfigs.NewTestDBConfigs(cp, cp, "fakesqldb") + + db.AddQuery("SELECT 1", &sqltypes.Result{}) + db.AddQuery("SHOW GLOBAL VARIABLES LIKE 'rpl_semi_sync%'", &sqltypes.Result{}) + db.AddQuery("STOP SLAVE", &sqltypes.Result{}) + + testMysqld := NewMysqld(dbc) + defer testMysqld.Close() + + ctx := context.Background() + err := testMysqld.ResetReplication(ctx) + assert.ErrorContains(t, err, "RESET SLAVE ALL") + + // We expect this query to be executed + db.AddQuery("RESET SLAVE ALL", &sqltypes.Result{}) + err = testMysqld.ResetReplication(ctx) + assert.ErrorContains(t, err, "RESET MASTER") + + // We expect this query to be executed + db.AddQuery("RESET MASTER", &sqltypes.Result{}) + err = testMysqld.ResetReplication(ctx) + assert.NoError(t, err) +} + +func TestResetReplicationParameters(t *testing.T) { + db := fakesqldb.New(t) + defer db.Close() + + params := db.ConnParams() + cp := *params + dbc := dbconfigs.NewTestDBConfigs(cp, cp, "fakesqldb") + + db.AddQuery("SELECT 1", &sqltypes.Result{}) + db.AddQuery("SHOW GLOBAL VARIABLES LIKE 'rpl_semi_sync%'", &sqltypes.Result{}) + db.AddQuery("STOP SLAVE", &sqltypes.Result{}) + + testMysqld := NewMysqld(dbc) + defer testMysqld.Close() + + ctx := context.Background() + err := testMysqld.ResetReplicationParameters(ctx) + assert.ErrorContains(t, err, "RESET SLAVE ALL") + + // We expect this query to be executed + db.AddQuery("RESET SLAVE ALL", &sqltypes.Result{}) + err = testMysqld.ResetReplicationParameters(ctx) + assert.NoError(t, err) +} + +func TestFindReplicas(t *testing.T) { + db := fakesqldb.New(t) + fakemysqld := NewFakeMysqlDaemon(db) + + defer func() { + db.Close() + fakemysqld.Close() + }() + + fakemysqld.FetchSuperQueryMap = map[string]*sqltypes.Result{ + "SHOW PROCESSLIST": sqltypes.MakeTestResult(sqltypes.MakeTestFields("Id|User|Host|db|Command|Time|State|Info", "varchar|varchar|varchar|varchar|varchar|varchar|varchar|varchar"), "1|user1|localhost:12|db1|Binlog Dump|54|Has sent all binlog to slave|NULL"), + } + + res, err := FindReplicas(fakemysqld) + assert.NoError(t, err) + + want, err := net.LookupHost("localhost") + require.NoError(t, err) + + assert.Equal(t, want, res) +} + +func TestGetBinlogInformation(t *testing.T) { + db := fakesqldb.New(t) + defer db.Close() + + params := db.ConnParams() + cp := *params + dbc := dbconfigs.NewTestDBConfigs(cp, cp, "fakesqldb") + + db.AddQuery("SELECT 1", &sqltypes.Result{}) + db.AddQuery("SELECT @@global.binlog_format, @@global.log_bin, @@global.log_slave_updates, @@global.binlog_row_image", sqltypes.MakeTestResult(sqltypes.MakeTestFields("@@global.binlog_format|@@global.log_bin|@@global.log_slave_updates|@@global.binlog_row_image", "varchar|int64|int64|varchar"), "binlog|1|2|row_image")) + + testMysqld := NewMysqld(dbc) + defer testMysqld.Close() + + ctx := context.Background() + bin, logBin, slaveUpdate, rowImage, err := testMysqld.GetBinlogInformation(ctx) + assert.NoError(t, err) + assert.Equal(t, "binlog", bin) + assert.Equal(t, "row_image", rowImage) + assert.True(t, logBin) + assert.False(t, slaveUpdate) +} + +func TestGetGTIDMode(t *testing.T) { + db := fakesqldb.New(t) + defer db.Close() + + params := db.ConnParams() + cp := *params + dbc := dbconfigs.NewTestDBConfigs(cp, cp, "fakesqldb") + + in := "8bc65c84-3fe4-11ed-a912-257f0fcdd6c9:1-8,8bc65c84-3fe4-11ed-a912-257f0fcdd6c9:12-17" + db.AddQuery("SELECT 1", &sqltypes.Result{}) + db.AddQuery("select @@global.gtid_mode", sqltypes.MakeTestResult(sqltypes.MakeTestFields("test_field", "varchar"), in)) + + testMysqld := NewMysqld(dbc) + defer testMysqld.Close() + + ctx := context.Background() + res, err := testMysqld.GetGTIDMode(ctx) + assert.NoError(t, err) + assert.Equal(t, in, res) +} + +func TestFlushBinaryLogs(t *testing.T) { + db := fakesqldb.New(t) + defer db.Close() + + params := db.ConnParams() + cp := *params + dbc := dbconfigs.NewTestDBConfigs(cp, cp, "fakesqldb") + + db.AddQuery("SELECT 1", &sqltypes.Result{}) + + testMysqld := NewMysqld(dbc) + defer testMysqld.Close() + + // We expect this query to be executed + err := testMysqld.FlushBinaryLogs(context.Background()) + assert.ErrorContains(t, err, "FLUSH BINARY LOGS") +} + +func TestGetBinaryLogs(t *testing.T) { + db := fakesqldb.New(t) + defer db.Close() + + params := db.ConnParams() + cp := *params + dbc := dbconfigs.NewTestDBConfigs(cp, cp, "fakesqldb") + + db.AddQuery("SELECT 1", &sqltypes.Result{}) + + testMysqld := NewMysqld(dbc) + defer testMysqld.Close() + + db.AddQuery("SHOW BINARY LOGS", sqltypes.MakeTestResult(sqltypes.MakeTestFields("field", "varchar"), "binlog1", "binlog2")) + + res, err := testMysqld.GetBinaryLogs(context.Background()) + assert.NoError(t, err) + assert.Len(t, res, 2) + assert.Contains(t, res, "binlog1") + assert.Contains(t, res, "binlog2") +} + +func TestGetPreviousGTIDs(t *testing.T) { + db := fakesqldb.New(t) + defer db.Close() + + params := db.ConnParams() + cp := *params + dbc := dbconfigs.NewTestDBConfigs(cp, cp, "fakesqldb") + + db.AddQuery("SELECT 1", &sqltypes.Result{}) + db.AddQuery("SHOW BINLOG EVENTS IN 'binlog' LIMIT 2", sqltypes.MakeTestResult(sqltypes.MakeTestFields("Event_type|Info", "varchar|varchar"), "Previous_gtids|8bc65c84-3fe4-11ed-a912-257f0fcdd6c9:1-8")) + + testMysqld := NewMysqld(dbc) + defer testMysqld.Close() + + ctx := context.Background() + res, err := testMysqld.GetPreviousGTIDs(ctx, "binlog") + assert.NoError(t, err) + assert.Equal(t, "8bc65c84-3fe4-11ed-a912-257f0fcdd6c9:1-8", res) +} + +func TestSetSemiSyncEnabled(t *testing.T) { + db := fakesqldb.New(t) + defer db.Close() + + params := db.ConnParams() + cp := *params + dbc := dbconfigs.NewTestDBConfigs(cp, cp, "fakesqldb") + + db.AddQuery("SELECT 1", &sqltypes.Result{}) + + testMysqld := NewMysqld(dbc) + defer testMysqld.Close() + + // We expect this query to be executed + err := testMysqld.SetSemiSyncEnabled(true, true) + assert.ErrorContains(t, err, "SET GLOBAL rpl_semi_sync_master_enabled = 1, GLOBAL rpl_semi_sync_slave_enabled = 1") + + // We expect this query to be executed + err = testMysqld.SetSemiSyncEnabled(true, false) + assert.ErrorContains(t, err, "SET GLOBAL rpl_semi_sync_master_enabled = 1, GLOBAL rpl_semi_sync_slave_enabled = 0") + + // We expect this query to be executed + err = testMysqld.SetSemiSyncEnabled(false, true) + assert.ErrorContains(t, err, "SET GLOBAL rpl_semi_sync_master_enabled = 0, GLOBAL rpl_semi_sync_slave_enabled = 1") +} + +func TestSemiSyncEnabled(t *testing.T) { + db := fakesqldb.New(t) + defer db.Close() + + params := db.ConnParams() + cp := *params + dbc := dbconfigs.NewTestDBConfigs(cp, cp, "fakesqldb") + + db.AddQuery("SELECT 1", &sqltypes.Result{}) + db.AddQuery("SHOW VARIABLES LIKE 'rpl_semi_sync_%_enabled'", sqltypes.MakeTestResult(sqltypes.MakeTestFields("field1|field2", "varchar|varchar"), "rpl_semi_sync_master_enabled|OFF", "rpl_semi_sync_slave_enabled|ON")) + + testMysqld := NewMysqld(dbc) + defer testMysqld.Close() + + p, r := testMysqld.SemiSyncEnabled() + assert.False(t, p) + assert.True(t, r) +} + +func TestSemiSyncStatus(t *testing.T) { + db := fakesqldb.New(t) + defer db.Close() + + params := db.ConnParams() + cp := *params + dbc := dbconfigs.NewTestDBConfigs(cp, cp, "fakesqldb") + + db.AddQuery("SELECT 1", &sqltypes.Result{}) + db.AddQuery("SHOW STATUS LIKE 'Rpl_semi_sync_%_status'", sqltypes.MakeTestResult(sqltypes.MakeTestFields("field1|field2", "varchar|varchar"), "Rpl_semi_sync_master_status|ON", "Rpl_semi_sync_slave_status|OFF")) + + testMysqld := NewMysqld(dbc) + defer testMysqld.Close() + + p, r := testMysqld.SemiSyncStatus() + assert.True(t, p) + assert.False(t, r) +} + +func TestSemiSyncClients(t *testing.T) { + db := fakesqldb.New(t) + defer db.Close() + + params := db.ConnParams() + cp := *params + dbc := dbconfigs.NewTestDBConfigs(cp, cp, "fakesqldb") + + db.AddQuery("SELECT 1", &sqltypes.Result{}) + db.AddQuery("SHOW STATUS LIKE 'Rpl_semi_sync_master_clients'", sqltypes.MakeTestResult(sqltypes.MakeTestFields("field1|field2", "varchar|uint64"), "val1|12")) + + testMysqld := NewMysqld(dbc) + defer testMysqld.Close() + + res := testMysqld.SemiSyncClients() + assert.Equal(t, uint32(12), res) +} + +func TestSemiSyncSettings(t *testing.T) { + db := fakesqldb.New(t) + defer db.Close() + + params := db.ConnParams() + cp := *params + dbc := dbconfigs.NewTestDBConfigs(cp, cp, "fakesqldb") + + db.AddQuery("SELECT 1", &sqltypes.Result{}) + db.AddQuery("SHOW VARIABLES LIKE 'rpl_semi_sync_%'", sqltypes.MakeTestResult(sqltypes.MakeTestFields("field1|field2", "varchar|uint64"), "rpl_semi_sync_master_timeout|123", "rpl_semi_sync_master_wait_for_slave_count|80")) + + testMysqld := NewMysqld(dbc) + defer testMysqld.Close() + + timeout, replicas := testMysqld.SemiSyncSettings() + assert.Equal(t, uint64(123), timeout) + assert.Equal(t, uint32(80), replicas) +} + +func TestSemiSyncReplicationStatus(t *testing.T) { + db := fakesqldb.New(t) + defer db.Close() + + params := db.ConnParams() + cp := *params + dbc := dbconfigs.NewTestDBConfigs(cp, cp, "fakesqldb") + + db.AddQuery("SELECT 1", &sqltypes.Result{}) + db.AddQuery("SHOW STATUS LIKE 'rpl_semi_sync_slave_status'", sqltypes.MakeTestResult(sqltypes.MakeTestFields("field1|field2", "varchar|uint64"), "rpl_semi_sync_slave_status|ON")) + + testMysqld := NewMysqld(dbc) + defer testMysqld.Close() + + res, err := testMysqld.SemiSyncReplicationStatus() + assert.NoError(t, err) + assert.True(t, res) + + db.AddQuery("SHOW STATUS LIKE 'rpl_semi_sync_slave_status'", sqltypes.MakeTestResult(sqltypes.MakeTestFields("field1|field2", "varchar|uint64"), "rpl_semi_sync_slave_status|OFF")) + + res, err = testMysqld.SemiSyncReplicationStatus() + assert.NoError(t, err) + assert.False(t, res) +} + +func TestSemiSyncExtensionLoaded(t *testing.T) { + db := fakesqldb.New(t) + defer db.Close() + + params := db.ConnParams() + cp := *params + dbc := dbconfigs.NewTestDBConfigs(cp, cp, "fakesqldb") + + db.AddQuery("SELECT 1", &sqltypes.Result{}) + db.AddQuery("SELECT COUNT(*) > 0 AS plugin_loaded FROM information_schema.plugins WHERE plugin_name LIKE 'rpl_semi_sync%'", sqltypes.MakeTestResult(sqltypes.MakeTestFields("field1", "int64"), "1")) + + testMysqld := NewMysqld(dbc) + defer testMysqld.Close() + + res, err := testMysqld.SemiSyncExtensionLoaded() + assert.NoError(t, err) + assert.True(t, res) + + db.AddQuery("SELECT COUNT(*) > 0 AS plugin_loaded FROM information_schema.plugins WHERE plugin_name LIKE 'rpl_semi_sync%'", sqltypes.MakeTestResult(sqltypes.MakeTestFields("field1", "int64"), "0")) + + res, err = testMysqld.SemiSyncExtensionLoaded() + assert.NoError(t, err) + assert.False(t, res) +} diff --git a/go/vt/mysqlctl/schema_test.go b/go/vt/mysqlctl/schema_test.go index fb64f8ca8ee..d73e6c13665 100644 --- a/go/vt/mysqlctl/schema_test.go +++ b/go/vt/mysqlctl/schema_test.go @@ -1,14 +1,35 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package mysqlctl import ( + "context" "fmt" "testing" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "vitess.io/vitess/go/mysql/fakesqldb" "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/vt/dbconfigs" + "vitess.io/vitess/go/vt/mysqlctl/tmutils" querypb "vitess.io/vitess/go/vt/proto/query" + "vitess.io/vitess/go/vt/proto/tabletmanagerdata" ) var queryMap map[string]*sqltypes.Result @@ -103,3 +124,286 @@ func TestColumnList(t *testing.T) { require.Equal(t, `[name:"col1" type:VARCHAR]`, fmt.Sprintf("%+v", fields)) } + +func TestGetSchemaAndSchemaChange(t *testing.T) { + db := fakesqldb.New(t) + defer db.Close() + + params := db.ConnParams() + cp := *params + dbc := dbconfigs.NewTestDBConfigs(cp, cp, "fakesqldb") + + testMysqld := NewMysqld(dbc) + defer testMysqld.Close() + + db.AddQuery("SELECT 1", &sqltypes.Result{}) + + db.AddQuery("SHOW CREATE DATABASE IF NOT EXISTS `fakesqldb`", sqltypes.MakeTestResult(sqltypes.MakeTestFields("test_field|cmd", "varchar|varchar"), "create_db|create_db_cmd")) + db.AddQuery("SHOW CREATE TABLE `fakesqldb`.`test_table`", sqltypes.MakeTestResult(sqltypes.MakeTestFields("test_field|cmd", "varchar|varchar"), "create_table|create_table_cmd")) + + db.AddQuery("SELECT table_name, table_type, data_length, table_rows FROM information_schema.tables WHERE table_schema = 'fakesqldb' AND table_type = 'BASE TABLE'", sqltypes.MakeTestResult( + sqltypes.MakeTestFields("table_name|table_type|data_length|table_rows", "varchar|varchar|uint64|uint64"), "test_table|test_type|NULL|2")) + + db.AddQuery("SELECT table_name, table_type, data_length, table_rows FROM information_schema.tables WHERE table_schema = 'fakesqldb'", sqltypes.MakeTestResult( + sqltypes.MakeTestFields("table_name|table_type|data_length|table_rows", "varchar|varchar|uint64|uint64"), "test_table|test_type|NULL|2")) + + query := fmt.Sprintf(GetColumnNamesQuery, sqltypes.EncodeStringSQL(db.Name()), sqltypes.EncodeStringSQL("test_table")) + db.AddQuery(query, &sqltypes.Result{ + Fields: []*querypb.Field{{ + Name: "column_name", + Type: sqltypes.VarChar, + }}, + Rows: [][]sqltypes.Value{ + {sqltypes.NewVarChar("col1")}, + {sqltypes.NewVarChar("col2")}, + }, + }) + + db.AddQuery("SELECT `col1`, `col2` FROM `fakesqldb`.`test_table` WHERE 1 != 1", &sqltypes.Result{ + Fields: []*querypb.Field{ + { + Name: "col1", + Type: sqltypes.VarChar, + }, + { + Name: "col2", + Type: sqltypes.VarChar, + }, + }, + Rows: [][]sqltypes.Value{}, + }) + + tableList, err := tableListSQL([]string{"test_table"}) + require.NoError(t, err) + + query = ` + SELECT TABLE_NAME as table_name, COLUMN_NAME as column_name + FROM information_schema.STATISTICS + WHERE TABLE_SCHEMA = %s AND TABLE_NAME IN %s AND LOWER(INDEX_NAME) = 'primary' + ORDER BY table_name, SEQ_IN_INDEX` + query = fmt.Sprintf(query, sqltypes.EncodeStringSQL("fakesqldb"), tableList) + db.AddQuery(query, sqltypes.MakeTestResult(sqltypes.MakeTestFields("TABLE_NAME|COLUMN_NAME", "varchar|varchar"), "test_table|col1", "test_table|col2")) + + ctx := context.Background() + res, err := testMysqld.GetSchema(ctx, db.Name(), &tabletmanagerdata.GetSchemaRequest{}) + assert.NoError(t, err) + assert.Equal(t, res.String(), `database_schema:"create_db_cmd" table_definitions:{name:"test_table" schema:"create_table_cmd" columns:"col1" columns:"col2" type:"test_type" row_count:2 fields:{name:"col1" type:VARCHAR} fields:{name:"col2" type:VARCHAR}}`) + + // Test ApplySchemaChange + db.AddQuery("\nSET sql_log_bin = 0", &sqltypes.Result{}) + + r, err := testMysqld.ApplySchemaChange(ctx, db.Name(), &tmutils.SchemaChange{}) + assert.NoError(t, err) + assert.Equal(t, r.BeforeSchema, r.AfterSchema, "BeforeSchema should be equal to AfterSchema as no schema change was passed") + assert.Equal(t, `database_schema:"create_db_cmd" table_definitions:{name:"test_table" schema:"create_table_cmd" columns:"col1" columns:"col2" type:"test_type" row_count:2 fields:{name:"col1" type:VARCHAR} fields:{name:"col2" type:VARCHAR}}`, r.BeforeSchema.String()) + + r, err = testMysqld.ApplySchemaChange(ctx, db.Name(), &tmutils.SchemaChange{ + BeforeSchema: &tabletmanagerdata.SchemaDefinition{ + DatabaseSchema: "create_db_cmd", + TableDefinitions: []*tabletmanagerdata.TableDefinition{ + { + Name: "test_table_changed", + Schema: "create_table_cmd", + Type: "test_type", + }, + }, + }, + AfterSchema: &tabletmanagerdata.SchemaDefinition{ + DatabaseSchema: "create_db_cmd", + TableDefinitions: []*tabletmanagerdata.TableDefinition{ + { + Name: "test_table", + Schema: "create_table_cmd", + Type: "test_type", + }, + }, + }, + }) + assert.NoError(t, err) + assert.Equal(t, r.BeforeSchema, r.AfterSchema) + + r, err = testMysqld.ApplySchemaChange(ctx, db.Name(), &tmutils.SchemaChange{ + BeforeSchema: &tabletmanagerdata.SchemaDefinition{ + DatabaseSchema: "create_db_cmd", + TableDefinitions: []*tabletmanagerdata.TableDefinition{ + { + Name: "test_table", + Schema: "create_table_cmd", + Type: "test_type", + }, + }, + }, + SQL: "EXPECT THIS QUERY TO BE EXECUTED;\n", + }) + assert.ErrorContains(t, err, "EXPECT THIS QUERY TO BE EXECUTED") + assert.Nil(t, r) + + // Test PreflightSchemaChange + db.AddQuery("SET sql_log_bin = 0", &sqltypes.Result{}) + db.AddQuery("\nDROP DATABASE IF EXISTS _vt_preflight", &sqltypes.Result{}) + db.AddQuery("\nCREATE DATABASE _vt_preflight", &sqltypes.Result{}) + db.AddQuery("\nUSE _vt_preflight", &sqltypes.Result{}) + db.AddQuery("\nSET foreign_key_checks = 0", &sqltypes.Result{}) + db.AddQuery("\nDROP DATABASE _vt_preflight", &sqltypes.Result{}) + + l, err := testMysqld.PreflightSchemaChange(context.Background(), db.Name(), []string{}) + assert.NoError(t, err) + assert.Empty(t, l) + + db.AddQuery("SHOW CREATE DATABASE IF NOT EXISTS `_vt_preflight`", sqltypes.MakeTestResult(sqltypes.MakeTestFields("test_field|cmd", "varchar|varchar"), "create_db|create_db_cmd")) + + db.AddQuery("SELECT table_name, table_type, data_length, table_rows FROM information_schema.tables WHERE table_schema = '_vt_preflight' AND table_type = 'BASE TABLE'", sqltypes.MakeTestResult( + sqltypes.MakeTestFields("table_name|table_type|data_length|table_rows", "varchar|varchar|uint64|uint64"), "test_table|test_type|NULL|2")) + db.AddQuery("SELECT table_name, table_type, data_length, table_rows FROM information_schema.tables WHERE table_schema = '_vt_preflight'", sqltypes.MakeTestResult( + sqltypes.MakeTestFields("table_name|table_type|data_length|table_rows", "varchar|varchar|uint64|uint64"), "test_table|test_type|NULL|2")) + db.AddQuery("SHOW CREATE TABLE `_vt_preflight`.`test_table`", sqltypes.MakeTestResult(sqltypes.MakeTestFields("test_field|cmd", "varchar|varchar"), "create_table|create_table_cmd")) + + query = ` + SELECT TABLE_NAME as table_name, COLUMN_NAME as column_name + FROM information_schema.STATISTICS + WHERE TABLE_SCHEMA = %s AND TABLE_NAME IN %s AND LOWER(INDEX_NAME) = 'primary' + ORDER BY table_name, SEQ_IN_INDEX` + query = fmt.Sprintf(query, sqltypes.EncodeStringSQL("_vt_preflight"), tableList) + db.AddQuery(query, sqltypes.MakeTestResult(sqltypes.MakeTestFields("TABLE_NAME|COLUMN_NAME", "varchar|varchar"), "test_table|col1", "test_table|col2")) + + query = fmt.Sprintf(GetColumnNamesQuery, sqltypes.EncodeStringSQL("_vt_preflight"), sqltypes.EncodeStringSQL("test_table")) + db.AddQuery(query, &sqltypes.Result{ + Fields: []*querypb.Field{{ + Name: "column_name", + Type: sqltypes.VarChar, + }}, + Rows: [][]sqltypes.Value{ + {sqltypes.NewVarChar("col1")}, + {sqltypes.NewVarChar("col2")}, + }, + }) + + db.AddQuery("SELECT `col1`, `col2` FROM `_vt_preflight`.`test_table` WHERE 1 != 1", &sqltypes.Result{ + Fields: []*querypb.Field{ + { + Name: "col1", + Type: sqltypes.VarChar, + }, + { + Name: "col2", + Type: sqltypes.VarChar, + }, + }, + Rows: [][]sqltypes.Value{}, + }) + + query = "EXPECT THIS QUERY TO BE EXECUTED" + _, err = testMysqld.PreflightSchemaChange(context.Background(), db.Name(), []string{query}) + assert.ErrorContains(t, err, query) +} + +func TestResolveTables(t *testing.T) { + db := fakesqldb.New(t) + testMysqld := NewFakeMysqlDaemon(db) + + defer func() { + db.Close() + testMysqld.Close() + }() + + ctx := context.Background() + res, err := ResolveTables(ctx, testMysqld, db.Name(), []string{}) + assert.ErrorContains(t, err, "no schema defined") + assert.Nil(t, res) + + testMysqld.Schema = &tabletmanagerdata.SchemaDefinition{TableDefinitions: tableDefinitions{{ + Name: "table1", + Schema: "schema1", + }, { + Name: "table2", + Schema: "schema2", + }}} + + res, err = ResolveTables(ctx, testMysqld, db.Name(), []string{"table1"}) + assert.NoError(t, err) + assert.Len(t, res, 1) + + res, err = ResolveTables(ctx, testMysqld, db.Name(), []string{"table1", "table2"}) + assert.NoError(t, err) + assert.Len(t, res, 2) +} + +func TestGetColumns(t *testing.T) { + db := fakesqldb.New(t) + defer db.Close() + + params := db.ConnParams() + cp := *params + dbc := dbconfigs.NewTestDBConfigs(cp, cp, db.Name()) + + db.AddQuery("SELECT 1", &sqltypes.Result{}) + + tableName := "test_table" + query := fmt.Sprintf(GetColumnNamesQuery, sqltypes.EncodeStringSQL(db.Name()), sqltypes.EncodeStringSQL(tableName)) + db.AddQuery(query, &sqltypes.Result{ + Fields: []*querypb.Field{{ + Name: "column_name", + Type: sqltypes.VarChar, + }}, + Rows: [][]sqltypes.Value{ + {sqltypes.NewVarChar("col1")}, + {sqltypes.NewVarChar("col2")}, + }, + }) + db.AddQuery("SELECT `col1`, `col2` FROM `fakesqldb`.`test_table` WHERE 1 != 1", &sqltypes.Result{ + Fields: []*querypb.Field{ + { + Name: "col1", + Type: sqltypes.VarChar, + }, + { + Name: "col2", + Type: sqltypes.VarChar, + }, + }, + Rows: [][]sqltypes.Value{}, + }) + + testMysqld := NewMysqld(dbc) + defer testMysqld.Close() + + ctx := context.Background() + + want := sqltypes.MakeTestFields("col1|col2", "varchar|varchar") + + field, cols, err := testMysqld.GetColumns(ctx, db.Name(), tableName) + assert.Equal(t, want, field) + assert.Equal(t, []string{"col1", "col2"}, cols) + assert.NoError(t, err) +} + +func TestGetPrimaryKeyColumns(t *testing.T) { + db := fakesqldb.New(t) + defer db.Close() + + params := db.ConnParams() + cp := *params + dbc := dbconfigs.NewTestDBConfigs(cp, cp, db.Name()) + + db.AddQuery("SELECT 1", &sqltypes.Result{}) + + testMysqld := NewMysqld(dbc) + defer testMysqld.Close() + + tableList, err := tableListSQL([]string{"test_table"}) + require.NoError(t, err) + + query := ` + SELECT TABLE_NAME as table_name, COLUMN_NAME as column_name + FROM information_schema.STATISTICS + WHERE TABLE_SCHEMA = %s AND TABLE_NAME IN %s AND LOWER(INDEX_NAME) = 'primary' + ORDER BY table_name, SEQ_IN_INDEX` + query = fmt.Sprintf(query, sqltypes.EncodeStringSQL("fakesqldb"), tableList) + db.AddQuery(query, sqltypes.MakeTestResult(sqltypes.MakeTestFields("table_name|column_name", "varchar|varchar"), "fakesqldb|col1", "fakesqldb2|col2")) + + ctx := context.Background() + res, err := testMysqld.GetPrimaryKeyColumns(ctx, db.Name(), "test_table") + assert.NoError(t, err) + assert.Contains(t, res, "col1") + assert.Len(t, res, 1) +} diff --git a/go/vt/mysqlctl/xtrabackupengine_test.go b/go/vt/mysqlctl/xtrabackupengine_test.go index 84230c54520..4ceec2f960d 100644 --- a/go/vt/mysqlctl/xtrabackupengine_test.go +++ b/go/vt/mysqlctl/xtrabackupengine_test.go @@ -46,12 +46,8 @@ func TestFindReplicationPosition(t *testing.T) { want := "145e508e-ae54-11e9-8ce6-46824dd1815e:1-3,1e51f8be-ae54-11e9-a7c6-4280a041109b:1-3,47b59de1-b368-11e9-b48b-624401d35560:1-152981,557def0a-b368-11e9-84ed-f6fffd91cc57:1-3,599ef589-ae55-11e9-9688-ca1f44501925:1-14857169,b9ce485d-b36b-11e9-9b17-2a6e0a6011f4:1-371262" pos, err := findReplicationPosition(input, "MySQL56", logutil.NewConsoleLogger()) - if err != nil { - t.Fatalf("findReplicationPosition error: %v", err) - } - if got := pos.String(); got != want { - t.Errorf("findReplicationPosition() = %v; want %v", got, want) - } + assert.NoError(t, err) + assert.Equal(t, want, pos.String()) } func TestFindReplicationPositionNoMatch(t *testing.T) { @@ -59,9 +55,7 @@ func TestFindReplicationPositionNoMatch(t *testing.T) { input := `nothing` _, err := findReplicationPosition(input, "MySQL56", logutil.NewConsoleLogger()) - if err == nil { - t.Fatalf("expected error from findReplicationPosition but got nil") - } + assert.Error(t, err) } func TestFindReplicationPositionEmptyMatch(t *testing.T) { @@ -71,9 +65,7 @@ func TestFindReplicationPositionEmptyMatch(t *testing.T) { '` _, err := findReplicationPosition(input, "MySQL56", logutil.NewConsoleLogger()) - if err == nil { - t.Fatalf("expected error from findReplicationPosition but got nil") - } + assert.Error(t, err) } func TestStripeRoundTrip(t *testing.T) { @@ -96,16 +88,11 @@ func TestStripeRoundTrip(t *testing.T) { // Read it back and merge. outBuf := &bytes.Buffer{} written, err := io.Copy(outBuf, stripeReader(readers, blockSize)) - if err != nil { - t.Errorf("dataSize=%d, blockSize=%d, stripes=%d; copy error: %v", dataSize, blockSize, stripes, err) - } - if written != dataSize { - t.Errorf("dataSize=%d, blockSize=%d, stripes=%d; copy error: wrote %d total bytes instead of dataSize", dataSize, blockSize, stripes, written) - } + assert.NoError(t, err) + assert.Equal(t, dataSize, written) + output := outBuf.Bytes() - if !bytes.Equal(input, output) { - t.Errorf("output bytes are not the same as input") - } + assert.Equal(t, input, output) } // Test block size that evenly divides data size. diff --git a/go/vt/vttablet/tabletmanager/vreplication/vreplicator_test.go b/go/vt/vttablet/tabletmanager/vreplication/vreplicator_test.go index 355df8fdd66..e4f55cc2384 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vreplicator_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vreplicator_test.go @@ -187,10 +187,7 @@ func TestPrimaryKeyEquivalentColumns(t *testing.T) { require.NoError(t, err, "could not connect to mysqld: %v", err) defer conn.Close() cols, indexName, err := mysqlctl.GetPrimaryKeyEquivalentColumns(ctx, conn.ExecuteFetch, env.Dbcfgs.DBName, tt.table) - if (err != nil) != tt.wantErr { - t.Errorf("Mysqld.GetPrimaryKeyEquivalentColumns() error = %v, wantErr %v", err, tt.wantErr) - return - } + assert.NoError(t, err) require.Equalf(t, cols, tt.wantCols, "Mysqld.GetPrimaryKeyEquivalentColumns() columns = %v, want %v", cols, tt.wantCols) require.Equalf(t, indexName, tt.wantIndex, "Mysqld.GetPrimaryKeyEquivalentColumns() index = %v, want %v", indexName, tt.wantIndex) })