diff --git a/go/cmd/vtcombo/main.go b/go/cmd/vtcombo/main.go index ff52c284216..d55e52a171a 100644 --- a/go/cmd/vtcombo/main.go +++ b/go/cmd/vtcombo/main.go @@ -61,7 +61,13 @@ var ( mysqlPort = flags.Int("mysql_port", 3306, "mysql port") externalTopoServer = flags.Bool("external_topo_server", false, "Should vtcombo use an external topology server instead of starting its own in-memory topology server. "+ "If true, vtcombo will use the flags defined in topo/server.go to open topo server") - plannerName = flags.String("planner-version", "", "Sets the default planner to use when the session has not changed it. Valid values are: V3, Gen4, Gen4Greedy and Gen4Fallback. Gen4Fallback tries the gen4 planner and falls back to the V3 planner if the gen4 fails.") + plannerName = flags.String("planner-version", "", "Sets the default planner to use when the session has not changed it. Valid values are: V3, V3Insert, Gen4, Gen4Greedy and Gen4Fallback. Gen4Fallback tries the gen4 planner and falls back to the V3 planner if the gen4 fails.") + vschemaPersistenceDir = flags.String("vschema-persistence-dir", "", "If set, per-keyspace vschema will be persisted in this directory "+ + "and reloaded into the in-memory topology server across restarts. Bookkeeping is performed using a simple watcher goroutine. "+ + "This is useful when running vtcombo as an application development container (e.g. vttestserver) where you want to keep the same "+ + "vschema even if developer's machine reboots. This works in tandem with vttestserver's --persistent_mode flag. Needless to say, "+ + "this is neither a perfect nor a production solution for vschema persistence. Consider using the --external_topo_server flag if "+ + "you require a more complete solution. This flag is ignored if --external_topo_server is set.") tpb vttestpb.VTTestTopology ts *topo.Server @@ -293,6 +299,10 @@ func main() { exit.Return(1) } + if *vschemaPersistenceDir != "" && !*externalTopoServer { + startVschemaWatcher(*vschemaPersistenceDir, tpb.Keyspaces, ts) + } + servenv.OnRun(func() { addStatusParts(vtg) }) diff --git a/go/cmd/vtcombo/vschema_watcher.go b/go/cmd/vtcombo/vschema_watcher.go new file mode 100644 index 00000000000..9194f61e774 --- /dev/null +++ b/go/cmd/vtcombo/vschema_watcher.go @@ -0,0 +1,111 @@ +/* +Copyright 2023 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "context" + "encoding/json" + "os" + "path" + + "vitess.io/vitess/go/vt/log" + vschemapb "vitess.io/vitess/go/vt/proto/vschema" + vttestpb "vitess.io/vitess/go/vt/proto/vttest" + "vitess.io/vitess/go/vt/topo" +) + +func startVschemaWatcher(vschemaPersistenceDir string, keyspaces []*vttestpb.Keyspace, ts *topo.Server) { + // Create the directory if it doesn't exist. + if err := createDirectoryIfNotExists(vschemaPersistenceDir); err != nil { + log.Fatalf("Unable to create vschema persistence directory %v: %v", vschemaPersistenceDir, err) + } + + // If there are keyspace files, load them. + loadKeyspacesFromDir(vschemaPersistenceDir, keyspaces, ts) + + // Rebuild the SrvVSchema object in case we loaded vschema from file + if err := ts.RebuildSrvVSchema(context.Background(), tpb.Cells); err != nil { + log.Fatalf("RebuildSrvVSchema failed: %v", err) + } + + // Now watch for changes in the SrvVSchema object and persist them to disk. + go watchSrvVSchema(context.Background(), ts, tpb.Cells[0]) +} + +func loadKeyspacesFromDir(dir string, keyspaces []*vttestpb.Keyspace, ts *topo.Server) { + for _, ks := range tpb.Keyspaces { + ksFile := path.Join(dir, ks.Name+".json") + if _, err := os.Stat(ksFile); err == nil { + jsonData, err := os.ReadFile(ksFile) + if err != nil { + log.Fatalf("Unable to read keyspace file %v: %v", ksFile, err) + } + + keyspace := &vschemapb.Keyspace{} + err = json.Unmarshal(jsonData, keyspace) + if err != nil { + log.Fatalf("Unable to parse keyspace file %v: %v", ksFile, err) + } + + ts.SaveVSchema(context.Background(), ks.Name, keyspace) + log.Infof("Loaded keyspace %v from %v\n", ks.Name, ksFile) + } + } +} + +func watchSrvVSchema(ctx context.Context, ts *topo.Server, cell string) { + data, ch, err := ts.WatchSrvVSchema(context.Background(), tpb.Cells[0]) + if err != nil { + log.Fatalf("WatchSrvVSchema failed: %v", err) + } + + if data.Err != nil { + log.Fatalf("WatchSrvVSchema could not retrieve initial vschema: %v", data.Err) + } + persistNewSrvVSchema(data.Value) + + for update := range ch { + if update.Err != nil { + log.Errorf("WatchSrvVSchema returned an error: %v", update.Err) + } else { + persistNewSrvVSchema(update.Value) + } + } +} + +func persistNewSrvVSchema(srvVSchema *vschemapb.SrvVSchema) { + for ksName, ks := range srvVSchema.Keyspaces { + jsonBytes, err := json.MarshalIndent(ks, "", " ") + if err != nil { + log.Errorf("Error marshaling keyspace: %v", err) + continue + } + + err = os.WriteFile(path.Join(*vschemaPersistenceDir, ksName+".json"), jsonBytes, 0644) + if err != nil { + log.Errorf("Error writing keyspace file: %v", err) + } + log.Infof("Persisted keyspace %v to %v", ksName, *vschemaPersistenceDir) + } +} + +func createDirectoryIfNotExists(dir string) error { + if _, err := os.Stat(dir); os.IsNotExist(err) { + return os.Mkdir(dir, 0755) + } + return nil +} diff --git a/go/cmd/vttestserver/main.go b/go/cmd/vttestserver/main.go index 6eedf82cdcf..7a50c83c42a 100644 --- a/go/cmd/vttestserver/main.go +++ b/go/cmd/vttestserver/main.go @@ -92,9 +92,9 @@ func registerFlags(fs *pflag.FlagSet) { " vttestserver as a database container in local developer environments. Note"+ " that db migration files (--schema_dir option) and seeding of"+ " random data (--initialize_with_random_data option) will only run during"+ - " cluster startup if the data directory does not already exist. vschema"+ - " migrations are run every time the cluster starts, since persistence"+ - " for the topology server has not been implemented yet") + " cluster startup if the data directory does not already exist. "+ + " Changes to VSchema are persisted across cluster restarts using a simple"+ + " watcher if the --data_dir argument is specified.") fs.BoolVar(&doSeed, "initialize_with_random_data", false, "If this flag is each table-shard will be initialized"+ diff --git a/go/cmd/vttestserver/vttestserver_test.go b/go/cmd/vttestserver/vttestserver_test.go index 0665d5f9c46..9313182d9b8 100644 --- a/go/cmd/vttestserver/vttestserver_test.go +++ b/go/cmd/vttestserver/vttestserver_test.go @@ -81,8 +81,14 @@ func TestPersistentMode(t *testing.T) { cluster, err := startPersistentCluster(dir) assert.NoError(t, err) - // basic sanity checks similar to TestRunsVschemaMigrations + // Add a new "ad-hoc" vindex via vtgate once the cluster is up, to later make sure it is persisted across teardowns + err = addColumnVindex(cluster, "test_keyspace", "alter vschema on persistence_test add vindex my_vdx(id)") + assert.NoError(t, err) + + // Basic sanity checks similar to TestRunsVschemaMigrations + // See go/cmd/vttestserver/data/schema/app_customer/* and go/cmd/vttestserver/data/schema/test_keyspace/* assertColumnVindex(t, cluster, columnVindex{keyspace: "test_keyspace", table: "test_table", vindex: "my_vdx", vindexType: "hash", column: "id"}) + assertColumnVindex(t, cluster, columnVindex{keyspace: "test_keyspace", table: "persistence_test", vindex: "my_vdx", vindexType: "hash", column: "id"}) assertColumnVindex(t, cluster, columnVindex{keyspace: "app_customer", table: "customers", vindex: "hash", vindexType: "hash", column: "id"}) // insert some data to ensure persistence across teardowns @@ -111,8 +117,9 @@ func TestPersistentMode(t *testing.T) { defer cluster.TearDown() assert.NoError(t, err) - // rerun our sanity checks to make sure vschema migrations are run during every startup + // rerun our sanity checks to make sure vschema is persisted correctly assertColumnVindex(t, cluster, columnVindex{keyspace: "test_keyspace", table: "test_table", vindex: "my_vdx", vindexType: "hash", column: "id"}) + assertColumnVindex(t, cluster, columnVindex{keyspace: "test_keyspace", table: "persistence_test", vindex: "my_vdx", vindexType: "hash", column: "id"}) assertColumnVindex(t, cluster, columnVindex{keyspace: "app_customer", table: "customers", vindex: "hash", vindexType: "hash", column: "id"}) // ensure previous data was successfully persisted @@ -316,7 +323,8 @@ func startCluster(flags ...string) (vttest.LocalCluster, error) { keyspaceArg := "--keyspaces=" + strings.Join(clusterKeyspaces, ",") numShardsArg := "--num_shards=2,2" vschemaDDLAuthorizedUsers := "--vschema_ddl_authorized_users=%" - os.Args = append(os.Args, []string{schemaDirArg, keyspaceArg, numShardsArg, tabletHostname, vschemaDDLAuthorizedUsers}...) + alsoLogToStderr := "--alsologtostderr" // better debugging + os.Args = append(os.Args, []string{schemaDirArg, keyspaceArg, numShardsArg, tabletHostname, vschemaDDLAuthorizedUsers, alsoLogToStderr}...) os.Args = append(os.Args, flags...) return runCluster() } diff --git a/go/flags/endtoend/vttestserver.txt b/go/flags/endtoend/vttestserver.txt index de7edcc522a..c5c4dd31101 100644 --- a/go/flags/endtoend/vttestserver.txt +++ b/go/flags/endtoend/vttestserver.txt @@ -77,7 +77,7 @@ Usage of vttestserver: --num_shards strings Comma separated shard count (one per keyspace) (default [2]) --onclose_timeout duration wait no more than this for OnClose handlers before stopping (default 10s) --onterm_timeout duration wait no more than this for OnTermSync handlers before stopping (default 10s) - --persistent_mode If this flag is set, the MySQL data directory is not cleaned up when LocalCluster.TearDown() is called. This is useful for running vttestserver as a database container in local developer environments. Note that db migration files (--schema_dir option) and seeding of random data (--initialize_with_random_data option) will only run during cluster startup if the data directory does not already exist. vschema migrations are run every time the cluster starts, since persistence for the topology server has not been implemented yet + --persistent_mode If this flag is set, the MySQL data directory is not cleaned up when LocalCluster.TearDown() is called. This is useful for running vttestserver as a database container in local developer environments. Note that db migration files (--schema_dir option) and seeding of random data (--initialize_with_random_data option) will only run during cluster startup if the data directory does not already exist. Changes to VSchema are persisted across cluster restarts using a simple watcher if the --data_dir argument is specified. --pid_file string If set, the process will write its pid to the named file, and delete it on graceful shutdown. --planner-version string Sets the default planner to use when the session has not changed it. Valid values are: V3, Gen4, Gen4Greedy and Gen4Fallback. Gen4Fallback tries the new gen4 planner and falls back to the V3 planner if the gen4 fails. --pool_hostname_resolve_interval duration if set force an update to all hostnames and reconnect if changed, defaults to 0 (disabled) diff --git a/go/vt/vttest/local_cluster.go b/go/vt/vttest/local_cluster.go index 40ba6937e27..2ad059a6235 100644 --- a/go/vt/vttest/local_cluster.go +++ b/go/vt/vttest/local_cluster.go @@ -458,28 +458,26 @@ func (db *LocalCluster) loadSchema(shouldRunDatabaseMigrations bool) error { } } - glob, _ := filepath.Glob(path.Join(schemaDir, "*.sql")) - for _, filepath := range glob { - cmds, err := LoadSQLFile(filepath, schemaDir) - if err != nil { - return err - } - - // One single vschema migration per file - if !db.OnlyMySQL && len(cmds) == 1 && strings.HasPrefix(strings.ToUpper(cmds[0]), "ALTER VSCHEMA") { - if err = db.applyVschema(keyspace, cmds[0]); err != nil { + if shouldRunDatabaseMigrations { + glob, _ := filepath.Glob(path.Join(schemaDir, "*.sql")) + for _, filepath := range glob { + cmds, err := LoadSQLFile(filepath, schemaDir) + if err != nil { return err } - continue - } - if !shouldRunDatabaseMigrations { - continue - } + // One single vschema migration per file + if !db.OnlyMySQL && len(cmds) == 1 && strings.HasPrefix(strings.ToUpper(cmds[0]), "ALTER VSCHEMA") { + if err = db.applyVschema(keyspace, cmds[0]); err != nil { + return err + } + continue + } - for _, dbname := range db.shardNames(kpb) { - if err := db.Execute(cmds, dbname); err != nil { - return err + for _, dbname := range db.shardNames(kpb) { + if err := db.Execute(cmds, dbname); err != nil { + return err + } } } } diff --git a/go/vt/vttest/vtprocess.go b/go/vt/vttest/vtprocess.go index 6c5f73b322c..ba86f458ebd 100644 --- a/go/vt/vttest/vtprocess.go +++ b/go/vt/vttest/vtprocess.go @@ -23,6 +23,7 @@ import ( "net/http" "os" "os/exec" + "path" "strings" "syscall" "time" @@ -234,6 +235,9 @@ func VtcomboProcess(environment Environment, args *Config, mysql MySQLManager) ( if args.SchemaDir != "" { vt.ExtraArgs = append(vt.ExtraArgs, []string{"--schema_dir", args.SchemaDir}...) } + if args.PersistentMode && args.DataDir != "" { + vt.ExtraArgs = append(vt.ExtraArgs, []string{"--vschema-persistence-dir", path.Join(args.DataDir, "vschema_data")}...) + } if args.TransactionMode != "" { vt.ExtraArgs = append(vt.ExtraArgs, []string{"--transaction_mode", args.TransactionMode}...) }