Skip to content

Commit

Permalink
Schema Tracking Refactor: Merge schema-tracking in health-streamer in…
Browse files Browse the repository at this point in the history
…to schema.Engine (#13121)

* test: add test to verify the expectations of health stream

Signed-off-by: Manan Gupta <[email protected]>

* test: augment the test to also test the case where views are enabled in vttablets

Signed-off-by: Manan Gupta <[email protected]>

* feat: use the schema engine notifier to run the reload in health-streamer

Signed-off-by: Manan Gupta <[email protected]>

* feat: deprecate the flag controlling the interval of health-streamer reload

Signed-off-by: Manan Gupta <[email protected]>

* test: augment test to verify schema tracking works for alters in views as well

Signed-off-by: Manan Gupta <[email protected]>

* feat: add a view table type and augment loadTable function to assign it correctly

Signed-off-by: Manan Gupta <[email protected]>

* feat: change health streamer to use the tables got from the schemaEngine instead of detecting the schema changes on its own

Signed-off-by: Manan Gupta <[email protected]>

* feat: make the schema engine capable of storing the table and view information in a database

Signed-off-by: Manan Gupta <[email protected]>

* feat: remove unused functions

Signed-off-by: Manan Gupta <[email protected]>

* feat: fix health streamer tests

Signed-off-by: Manan Gupta <[email protected]>

* test: fix tests by also deleting the tablet record for the tablets that are removed

Signed-off-by: Manan Gupta <[email protected]>

* feat: fix deadlocks and incorrect writes from health-streamer

Signed-off-by: Manan Gupta <[email protected]>

* feat: improve how we start health streamer and engine

Signed-off-by: Manan Gupta <[email protected]>

* feat: use the newly introduced field

Signed-off-by: Manan Gupta <[email protected]>

* feat: use sqlparser.string to get the escaped table name

Signed-off-by: Manan Gupta <[email protected]>

* test: fix the default configuration test's expectation

Signed-off-by: Manan Gupta <[email protected]>

* test: add the new tables to the expected output of tests

Signed-off-by: Manan Gupta <[email protected]>

* test: fix test expectation to match MySQL 8.0 behaviour

Signed-off-by: Manan Gupta <[email protected]>

* feat: fix schema change test

Signed-off-by: Manan Gupta <[email protected]>

* feat: fix state manager test

Signed-off-by: Manan Gupta <[email protected]>

* feat: use the same flag for the context in schema-engine

Signed-off-by: Manan Gupta <[email protected]>

* test: refactor test to use a single stream call

Signed-off-by: Manan Gupta <[email protected]>

* feat: revert changes to json marshal in config

Signed-off-by: Manan Gupta <[email protected]>

* feat: fix health stream timeout test

Signed-off-by: Manan Gupta <[email protected]>

* feat: refactor engine notifier to send tables instead of strings

Signed-off-by: Manan Gupta <[email protected]>

* feat: add tests for all the functions in the db.go file and fix a couple of them

Signed-off-by: Manan Gupta <[email protected]>

* feat: refactor code and fix a couple of bugs

Signed-off-by: Manan Gupta <[email protected]>

* test: add more tests for the reload database logic

Signed-off-by: Manan Gupta <[email protected]>

* test: fix schema version test

Signed-off-by: Manan Gupta <[email protected]>

* test: added more tests for all the schema.engine functions

Signed-off-by: Manan Gupta <[email protected]>

* test: fix engine test flakiness

Signed-off-by: Manan Gupta <[email protected]>

* test: add health-streamer test

Signed-off-by: Manan Gupta <[email protected]>

* feat: fix data race in fakedb

Signed-off-by: Manan Gupta <[email protected]>

* test: add header comments to tests

Signed-off-by: Manan Gupta <[email protected]>

* feat: rename views table columns to table_schema and table_name

Signed-off-by: Manan Gupta <[email protected]>

* feat: fix detectViewChange query

Signed-off-by: Manan Gupta <[email protected]>

* feat: fix the summary

Signed-off-by: Manan Gupta <[email protected]>

---------

Signed-off-by: Manan Gupta <[email protected]>
  • Loading branch information
GuptaManan100 authored May 31, 2023
1 parent a4b8e26 commit 9e892eb
Show file tree
Hide file tree
Showing 38 changed files with 2,822 additions and 515 deletions.
7 changes: 7 additions & 0 deletions changelog/17.0/17.0.0/summary.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
- [Support for MySQL 8.0 `binlog_transaction_compression`](#binlog-compression)
- **[VTTablet](#vttablet)**
- [VTTablet: Initializing all replicas with super_read_only](#vttablet-initialization)
- [Vttablet Schema Reload Timeout](#vttablet-schema-reload-timeout)
- [Settings pool enabled](#settings-pool)
- **[VReplication](#VReplication)**
- [Support for the `noblob` binlog row image mode](#noblob)
Expand Down Expand Up @@ -319,6 +320,10 @@ This is even more important if you are running Vitess on the vitess-operator.
You must ensure your `init_db.sql` is up-to-date with the new default for `v17.0.0`.
The default file can be found in `./config/init_db.sql`.
#### <a id="vttablet-schema-reload-timeout"/> Vttablet Schema Reload Timeout
A new flag, `--schema-change-reload-timeout` has been added to timeout the reload of the schema that Vttablet does periodically. This is required because sometimes this operation can get stuck after MySQL restarts, etc. More details available in the issue https://github.com/vitessio/vitess/issues/13001.
#### <a id="settings-pool"/> Settings Pool
This was introduced in v15 and it enables pooling the connection with modified connection settings.
To know more what it does read the [v15 release notes](https://github.com/vitessio/vitess/releases/tag/v15.0.0) or the [blog](https://vitess.io/blog/2023-03-27-connection-pooling-in-vitess/) or [docs](https://vitess.io/docs/17.0/reference/query-serving/reserved-conn/)
Expand Down Expand Up @@ -385,6 +390,8 @@ This could be a breaking change for grpc api users based on how they have implem
`schema_change_check_interval` now **only** accepts Go duration values. This affects `vtctld`.
* The flag `durability_policy` is no longer used by vtctld. Instead it reads the durability policies for all keyspaces from the topology server.
* The flag `use_super_read_only` is deprecated and will be removed in a later release. This affects `vttablet`.
* The flag `queryserver-config-schema-change-signal-interval` is deprecated and will be removed in a later release. This affects `vttablet`.
Schema-tracking has been refactored in this release to not use polling anymore, therefore the signal interval isn't required anymore.
In `vttablet` various flags that took float values as seconds have updated to take the standard duration syntax as well.
Float-style parsing is now deprecated and will be removed in a later release.
Expand Down
3 changes: 1 addition & 2 deletions go/flags/endtoend/vttablet.txt
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,6 @@ Usage of vttablet:
--queryserver-config-query-pool-waiter-cap int query server query pool waiter limit, this is the maximum number of queries that can be queued waiting to get a connection (default 5000)
--queryserver-config-query-timeout duration query server query timeout (in seconds), this is the query timeout in vttablet side. If a query takes more than this timeout, it will be killed. (default 30s)
--queryserver-config-schema-change-signal query server schema signal, will signal connected vtgates that schema has changed whenever this is detected. VTGates will need to have -schema_change_signal enabled for this to work (default true)
--queryserver-config-schema-change-signal-interval duration query server schema change signal interval defines at which interval the query server shall send schema updates to vtgate. (default 5s)
--queryserver-config-schema-reload-time duration query server schema reload time, how often vttablet reloads schemas from underlying MySQL instance in seconds. vttablet keeps table schemas in its own memory and periodically refreshes it from MySQL. This config controls the reload time. (default 30m0s)
--queryserver-config-stream-buffer-size int query server stream buffer size, the maximum number of bytes sent from vttablet for each stream call. It's recommended to keep this value in sync with vtgate's stream_buffer_size. (default 32768)
--queryserver-config-stream-pool-size int query server stream connection pool size, stream pool is used by stream queries: queries that return results to client in a streaming fashion (default 200)
Expand Down Expand Up @@ -269,7 +268,7 @@ Usage of vttablet:
--s3_backup_storage_root string root prefix for all backup-related object names.
--s3_backup_tls_skip_verify_cert skip the 'certificate is valid' check for SSL connections.
--sanitize_log_messages Remove potentially sensitive information in tablet INFO, WARNING, and ERROR log messages such as query parameters.
--schema-change-reload-timeout duration query server schema change signal reload timeout, this is how long to wait for the signaled schema reload operation to complete before giving up (default 30s)
--schema-change-reload-timeout duration query server schema change reload timeout, this is how long to wait for the signaled schema reload operation to complete before giving up (default 30s)
--schema-version-max-age-seconds int max age of schema version records to kept in memory by the vreplication historian
--security_policy string the name of a registered security policy to use for controlling access to URLs - empty means allow all for anyone (built-in policies: deny-all, read-only)
--service_map strings comma separated list of services to enable (or disable if prefixed with '-') Example: grpc-queryservice
Expand Down
23 changes: 21 additions & 2 deletions go/mysql/fakesqldb/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,10 @@ type DB struct {
// if fakesqldb is asked to serve queries or query patterns that it has not been explicitly told about it will
// error out by default. However if you set this flag then any unmatched query results in an empty result
neverFail atomic.Bool

// lastError stores the last error in returning a query result.
lastErrorMu sync.Mutex
lastError error
}

// QueryHandler is the interface used by the DB to simulate executed queries
Expand Down Expand Up @@ -176,6 +180,7 @@ func New(t testing.TB) *DB {
connections: make(map[uint32]*mysql.Conn),
queryPatternUserCallback: make(map[*regexp.Regexp]func(string)),
patternData: make(map[string]exprResult),
lastErrorMu: sync.Mutex{},
}

db.Handler = db
Expand Down Expand Up @@ -245,6 +250,13 @@ func (db *DB) CloseAllConnections() {
}
}

// LastError gives the last error the DB ran into
func (db *DB) LastError() error {
db.lastErrorMu.Lock()
defer db.lastErrorMu.Unlock()
return db.lastError
}

// WaitForClose should be used after CloseAllConnections() is closed and
// you want to provoke a MySQL client error with errno 2006.
//
Expand Down Expand Up @@ -342,7 +354,14 @@ func (db *DB) WarningCount(c *mysql.Conn) uint16 {
}

// HandleQuery is the default implementation of the QueryHandler interface
func (db *DB) HandleQuery(c *mysql.Conn, query string, callback func(*sqltypes.Result) error) error {
func (db *DB) HandleQuery(c *mysql.Conn, query string, callback func(*sqltypes.Result) error) (err error) {
defer func() {
if err != nil {
db.lastErrorMu.Lock()
db.lastError = err
db.lastErrorMu.Unlock()
}
}()
if db.allowAll.Load() {
return callback(&sqltypes.Result{})
}
Expand Down Expand Up @@ -413,7 +432,7 @@ func (db *DB) HandleQuery(c *mysql.Conn, query string, callback func(*sqltypes.R
return callback(&sqltypes.Result{})
}
// Nothing matched.
err := fmt.Errorf("fakesqldb:: query: '%s' is not supported on %v",
err = fmt.Errorf("fakesqldb:: query: '%s' is not supported on %v",
sqlparser.TruncateForUI(query), db.name)
log.Errorf("Query not found: %s", sqlparser.TruncateForUI(query))

Expand Down
40 changes: 40 additions & 0 deletions go/test/endtoend/cluster/cluster_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -929,6 +929,46 @@ func (cluster *LocalProcessCluster) StreamTabletHealth(ctx context.Context, vtta
return responses, nil
}

// StreamTabletHealthUntil invokes a HealthStream on a local cluster Vttablet and
// returns the responses. It waits until a certain condition is met. The amount of time to wait is an input that it takes.
func (cluster *LocalProcessCluster) StreamTabletHealthUntil(ctx context.Context, vttablet *Vttablet, timeout time.Duration, condition func(shr *querypb.StreamHealthResponse) bool) error {
tablet, err := cluster.VtctlclientGetTablet(vttablet)
if err != nil {
return err
}

conn, err := tabletconn.GetDialer()(tablet, grpcclient.FailFast(false))
if err != nil {
return err
}

conditionSuccess := false
timeoutExceeded := false
go func() {
time.Sleep(timeout)
timeoutExceeded = true
}()

err = conn.StreamHealth(ctx, func(shr *querypb.StreamHealthResponse) error {
if condition(shr) {
conditionSuccess = true
}
if timeoutExceeded || conditionSuccess {
return io.EOF
}
return nil
})

if conditionSuccess {
return nil
}

if timeoutExceeded {
return errors.New("timeout exceed while waiting for the condition in StreamHealth")
}
return err
}

func (cluster *LocalProcessCluster) VtctlclientGetTablet(tablet *Vttablet) (*topodatapb.Tablet, error) {
result, err := cluster.VtctlclientProcess.ExecuteCommandWithOutput("GetTablet", "--", tablet.Alias)
if err != nil {
Expand Down
12 changes: 12 additions & 0 deletions go/test/endtoend/cluster/vtctldclient_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,18 @@ func VtctldClientProcessInstance(hostname string, grpcPort int, tmpDirectory str
return vtctldclient
}

// PlannedReparentShard executes vtctlclient command to make specified tablet the primary for the shard.
func (vtctldclient *VtctldClientProcess) PlannedReparentShard(Keyspace string, Shard string, alias string) (err error) {
output, err := vtctldclient.ExecuteCommandWithOutput(
"PlannedReparentShard",
fmt.Sprintf("%s/%s", Keyspace, Shard),
"--new-primary", alias)
if err != nil {
log.Errorf("error in PlannedReparentShard output %s, err %s", output, err.Error())
}
return err
}

// CreateKeyspace executes the vtctl command to create a keyspace
func (vtctldclient *VtctldClientProcess) CreateKeyspace(keyspaceName string, sidecarDBName string) (err error) {
var output string
Expand Down
2 changes: 1 addition & 1 deletion go/test/endtoend/tabletmanager/custom_rule_topo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,5 +119,5 @@ func TestTopoCustomRule(t *testing.T) {
// Reset the VtTabletExtraArgs
clusterInstance.VtTabletExtraArgs = []string{}
// Tear down custom processes
killTablets(t, rTablet)
killTablets(rTablet)
}
1 change: 1 addition & 0 deletions go/test/endtoend/tabletmanager/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ func TestMain(m *testing.M) {
// List of users authorized to execute vschema ddl operations
clusterInstance.VtGateExtraArgs = []string{
"--vschema_ddl_authorized_users=%",
"--enable-views",
"--discovery_low_replication_lag", tabletUnhealthyThreshold.String(),
}
// Set extra tablet args for lock timeout
Expand Down
147 changes: 143 additions & 4 deletions go/test/endtoend/tabletmanager/tablet_health_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"k8s.io/utils/strings/slices"

"vitess.io/vitess/go/json2"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/test/endtoend/utils"

querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)
Expand Down Expand Up @@ -87,13 +87,15 @@ func TestTabletReshuffle(t *testing.T) {
err = clusterInstance.VtctlclientProcess.ExecuteCommand("Backup", rTablet.Alias)
assert.Error(t, err, "cannot perform backup without my.cnf")

killTablets(t, rTablet)
killTablets(rTablet)
}

func TestHealthCheck(t *testing.T) {
// Add one replica that starts not initialized
defer cluster.PanicHandler(t)
ctx := context.Background()
clusterInstance.DisableVTOrcRecoveries(t)
defer clusterInstance.EnableVTOrcRecoveries(t)

rTablet := clusterInstance.NewVttabletInstance("replica", 0, "")

Expand Down Expand Up @@ -192,7 +194,141 @@ func TestHealthCheck(t *testing.T) {
}

// Manual cleanup of processes
killTablets(t, rTablet)
killTablets(rTablet)
}

// TestHealthCheckSchemaChangeSignal tests the tables and views, which report their schemas have changed in the output of a StreamHealth.
func TestHealthCheckSchemaChangeSignal(t *testing.T) {
// Add one replica that starts not initialized
defer cluster.PanicHandler(t)
ctx := context.Background()

vtParams := clusterInstance.GetVTParams(keyspaceName)
conn, err := mysql.Connect(ctx, &vtParams)
require.NoError(t, err)
defer conn.Close()

// Make sure the primary is the primary when the test starts.
// This state should be ensured before we actually test anything.
checkTabletType(t, primaryTablet.Alias, "PRIMARY")

// Run a bunch of DDL queries and verify that the tables/views changed show up in the health stream.
// These tests are for the part where `--queryserver-enable-views` flag is not set.
verifyHealthStreamSchemaChangeSignals(t, conn, &primaryTablet, false)

// We start a new vttablet, this time with `--queryserver-enable-views` flag specified.
tempTablet := clusterInstance.NewVttabletInstance("replica", 0, "")
// Start Mysql Processes and return connection
_, err = cluster.StartMySQLAndGetConnection(ctx, tempTablet, username, clusterInstance.TmpDirectory)
require.NoError(t, err)
oldArgs := clusterInstance.VtTabletExtraArgs
clusterInstance.VtTabletExtraArgs = append(clusterInstance.VtTabletExtraArgs, "--queryserver-enable-views")
defer func() {
clusterInstance.VtTabletExtraArgs = oldArgs
}()
// start vttablet process, should be in SERVING state as we already have a primary.
err = clusterInstance.StartVttablet(tempTablet, "SERVING", false, cell, keyspaceName, hostname, shardName)
require.NoError(t, err)

defer func() {
// Restore the primary tablet back to the original.
err = clusterInstance.VtctldClientProcess.PlannedReparentShard(keyspaceName, shardName, primaryTablet.Alias)
require.NoError(t, err)
// Manual cleanup of processes
killTablets(tempTablet)
}()

// Now we reparent the cluster to the new tablet we have.
err = clusterInstance.VtctldClientProcess.PlannedReparentShard(keyspaceName, shardName, tempTablet.Alias)
require.NoError(t, err)

checkTabletType(t, tempTablet.Alias, "PRIMARY")
// Run a bunch of DDL queries and verify that the tables/views changed show up in the health stream.
// These tests are for the part where `--queryserver-enable-views` flag is set.
verifyHealthStreamSchemaChangeSignals(t, conn, tempTablet, true)
}

func verifyHealthStreamSchemaChangeSignals(t *testing.T, vtgateConn *mysql.Conn, primaryTablet *cluster.Vttablet, viewsEnabled bool) {
var streamErr error
wg := sync.WaitGroup{}
wg.Add(1)
ranOnce := false
finished := false
ch := make(chan *querypb.StreamHealthResponse)
go func() {
defer wg.Done()
streamErr = clusterInstance.StreamTabletHealthUntil(context.Background(), primaryTablet, 30*time.Second, func(shr *querypb.StreamHealthResponse) bool {
ranOnce = true
// If we are finished, then close the channel and end the stream.
if finished {
close(ch)
return true
}
// Put the response in the channel.
ch <- shr
return false
})
}()
// The test becomes flaky if we run the DDL immediately after starting the above go routine because the client for the Stream
// sometimes isn't registered by the time DDL runs, and it misses the update we get. To prevent this situation, we wait for one Stream packet
// to have returned. Once we know we received a Stream packet, then we know that we are registered for the health stream and can execute the DDL.
for i := 0; i < 30; i++ {
if ranOnce {
break
}
time.Sleep(1 * time.Second)
}

verifyTableDDLSchemaChangeSignal(t, vtgateConn, ch, "CREATE TABLE `area` (`id` int NOT NULL, `country` varchar(30), PRIMARY KEY (`id`))", "area")
verifyTableDDLSchemaChangeSignal(t, vtgateConn, ch, "CREATE TABLE `area2` (`id` int NOT NULL, PRIMARY KEY (`id`))", "area2")
verifyViewDDLSchemaChangeSignal(t, vtgateConn, ch, "CREATE VIEW v2 as select * from t1", viewsEnabled)
verifyTableDDLSchemaChangeSignal(t, vtgateConn, ch, "ALTER TABLE `area` ADD COLUMN name varchar(30) NOT NULL", "area")
verifyTableDDLSchemaChangeSignal(t, vtgateConn, ch, "DROP TABLE `area2`", "area2")
verifyViewDDLSchemaChangeSignal(t, vtgateConn, ch, "ALTER VIEW v2 as select id from t1", viewsEnabled)
verifyViewDDLSchemaChangeSignal(t, vtgateConn, ch, "DROP VIEW v2", viewsEnabled)
verifyTableDDLSchemaChangeSignal(t, vtgateConn, ch, "DROP TABLE `area`", "area")

finished = true
wg.Wait()
require.NoError(t, streamErr)
}

func verifyTableDDLSchemaChangeSignal(t *testing.T, vtgateConn *mysql.Conn, ch chan *querypb.StreamHealthResponse, query string, table string) {
_, err := vtgateConn.ExecuteFetch(query, 10000, false)
require.NoError(t, err)

timeout := time.After(15 * time.Second)
for {
select {
case shr := <-ch:
if shr != nil && shr.RealtimeStats != nil && slices.Contains(shr.RealtimeStats.TableSchemaChanged, table) {
return
}
case <-timeout:
t.Errorf("didn't get the correct tables changed in stream response until timeout")
}
}
}

func verifyViewDDLSchemaChangeSignal(t *testing.T, vtgateConn *mysql.Conn, ch chan *querypb.StreamHealthResponse, query string, viewsEnabled bool) {
_, err := vtgateConn.ExecuteFetch(query, 10000, false)
require.NoError(t, err)

timeout := time.After(15 * time.Second)
for {
select {
case shr := <-ch:
listToUse := shr.RealtimeStats.TableSchemaChanged
if viewsEnabled {
listToUse = shr.RealtimeStats.ViewSchemaChanged
}
if shr != nil && shr.RealtimeStats != nil && slices.Contains(listToUse, "v2") {
return
}
case <-timeout:
t.Errorf("didn't get the correct views changed in stream response until timeout")
}
}
}

func checkHealth(t *testing.T, port int, shouldError bool) {
Expand Down Expand Up @@ -247,6 +383,8 @@ func TestHealthCheckDrainedStateDoesNotShutdownQueryService(t *testing.T) {

//Wait if tablet is not in service state
defer cluster.PanicHandler(t)
clusterInstance.DisableVTOrcRecoveries(t)
defer clusterInstance.EnableVTOrcRecoveries(t)
err := rdonlyTablet.VttabletProcess.WaitForTabletStatus("SERVING")
require.NoError(t, err)

Expand Down Expand Up @@ -284,14 +422,15 @@ func TestHealthCheckDrainedStateDoesNotShutdownQueryService(t *testing.T) {
checkHealth(t, rdonlyTablet.HTTPPort, false)
}

func killTablets(t *testing.T, tablets ...*cluster.Vttablet) {
func killTablets(tablets ...*cluster.Vttablet) {
var wg sync.WaitGroup
for _, tablet := range tablets {
wg.Add(1)
go func(tablet *cluster.Vttablet) {
defer wg.Done()
_ = tablet.VttabletProcess.TearDown()
_ = tablet.MysqlctlProcess.Stop()
_ = clusterInstance.VtctlclientProcess.ExecuteCommand("DeleteTablet", tablet.Alias)
}(tablet)
}
wg.Wait()
Expand Down
Loading

0 comments on commit 9e892eb

Please sign in to comment.