Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Schema Tracking Refactor: Merge schema-tracking in health-streamer into schema.Engine #13121

Merged
merged 42 commits into from
May 31, 2023
Merged
Show file tree
Hide file tree
Changes from 41 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
4e75fe9
test: add test to verify the expectations of health stream
GuptaManan100 May 18, 2023
122c5e6
test: augment the test to also test the case where views are enabled …
GuptaManan100 May 18, 2023
714a975
feat: use the schema engine notifier to run the reload in health-stre…
GuptaManan100 May 19, 2023
36758f2
feat: deprecate the flag controlling the interval of health-streamer …
GuptaManan100 May 19, 2023
9923f76
test: augment test to verify schema tracking works for alters in view…
GuptaManan100 May 19, 2023
a950d15
feat: add a view table type and augment loadTable function to assign …
GuptaManan100 May 19, 2023
69c3554
feat: change health streamer to use the tables got from the schemaEng…
GuptaManan100 May 19, 2023
9a8202e
feat: make the schema engine capable of storing the table and view in…
GuptaManan100 May 22, 2023
b4a7f95
feat: remove unused functions
GuptaManan100 May 22, 2023
5643738
feat: fix health streamer tests
GuptaManan100 May 23, 2023
90e0b56
test: fix tests by also deleting the tablet record for the tablets th…
GuptaManan100 May 23, 2023
05b4b82
feat: fix deadlocks and incorrect writes from health-streamer
GuptaManan100 May 24, 2023
43056d7
feat: improve how we start health streamer and engine
GuptaManan100 May 24, 2023
5bb9291
feat: use the newly introduced field
GuptaManan100 May 24, 2023
a7e6159
feat: use sqlparser.string to get the escaped table name
GuptaManan100 May 24, 2023
41c5d2a
test: fix the default configuration test's expectation
GuptaManan100 May 24, 2023
c47b88b
test: add the new tables to the expected output of tests
GuptaManan100 May 24, 2023
7f56125
test: fix test expectation to match MySQL 8.0 behaviour
GuptaManan100 May 24, 2023
e5272fb
Merge remote-tracking branch 'upstream/main' into get-schema-rpc
GuptaManan100 May 24, 2023
25814d1
feat: fix schema change test
GuptaManan100 May 25, 2023
0d2ac63
feat: fix state manager test
GuptaManan100 May 25, 2023
7699de2
Merge remote-tracking branch 'upstream/main' into get-schema-rpc
GuptaManan100 May 25, 2023
42e8ca4
feat: use the same flag for the context in schema-engine
GuptaManan100 May 25, 2023
32ab02a
test: refactor test to use a single stream call
GuptaManan100 May 25, 2023
aea8396
feat: revert changes to json marshal in config
GuptaManan100 May 25, 2023
707a460
feat: fix health stream timeout test
GuptaManan100 May 25, 2023
dfdb469
feat: refactor engine notifier to send tables instead of strings
GuptaManan100 May 25, 2023
16a4acf
feat: add tests for all the functions in the db.go file and fix a cou…
GuptaManan100 May 26, 2023
54943b2
feat: refactor code and fix a couple of bugs
GuptaManan100 May 26, 2023
877dc48
test: add more tests for the reload database logic
GuptaManan100 May 26, 2023
5ebe65d
test: fix schema version test
GuptaManan100 May 26, 2023
94f8c49
test: added more tests for all the schema.engine functions
GuptaManan100 May 26, 2023
f7b496e
test: fix engine test flakiness
GuptaManan100 May 29, 2023
c8d891e
Merge remote-tracking branch 'upstream/main' into get-schema-rpc
GuptaManan100 May 29, 2023
96cec2c
test: add health-streamer test
GuptaManan100 May 29, 2023
f778e0b
feat: fix data race in fakedb
GuptaManan100 May 29, 2023
ff01bf9
Merge remote-tracking branch 'upstream/main' into get-schema-rpc
GuptaManan100 May 30, 2023
dcff98a
Merge remote-tracking branch 'upstream/main' into get-schema-rpc
GuptaManan100 May 30, 2023
993fa6b
test: add header comments to tests
GuptaManan100 May 30, 2023
05d3546
feat: rename views table columns to table_schema and table_name
GuptaManan100 May 30, 2023
ee7562d
feat: fix detectViewChange query
GuptaManan100 May 30, 2023
e00653f
feat: fix the summary
GuptaManan100 May 30, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions changelog/17.0/17.0.0/summary.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
- [Support for MySQL 8.0 `binlog_transaction_compression`](#binlog-compression)
- **[VTTablet](#vttablet)**
- [VTTablet: Initializing all replicas with super_read_only](#vttablet-initialization)
- [Vttablet Schema Reload Timeout](#vttablet-schema-reload-timeout)
- [Settings pool enabled](#settings-pool)
- **[VReplication](#VReplication)**
- [Support for the `noblob` binlog row image mode](#noblob)
Expand Down Expand Up @@ -316,6 +317,10 @@ This is even more important if you are running Vitess on the vitess-operator.
You must ensure your `init_db.sql` is up-to-date with the new default for `v17.0.0`.
The default file can be found in `./config/init_db.sql`.

#### <a id="vttablet-schema-reload-timeout"/> Vttablet Schema Reload Timeout

A new flag, `--schema-change-reload-timeout` has been added to timeout the reload of the schema that Vttablet does periodically. This is required because sometimes this operation can get stuck after MySQL restarts, etc.
GuptaManan100 marked this conversation as resolved.
Show resolved Hide resolved

#### <a id="settings-pool"/> Settings Pool
This was introduced in v15 and it enables pooling the connection with modified connection settings.
To know more what it does read the [v15 release notes](https://github.com/vitessio/vitess/releases/tag/v15.0.0) or the [blog](https://vitess.io/blog/2023-03-27-connection-pooling-in-vitess/) or [docs](https://vitess.io/docs/17.0/reference/query-serving/reserved-conn/)
Expand Down Expand Up @@ -395,6 +400,10 @@ Affected flags and YAML config keys:
- `shutdown_grace_period`
- `unhealthy_threshold`

The flag `queryserver-config-schema-change-signal-interval` is deprecated and will be removed in a later release.
Schema-tracking has been refactored in this release to not use polling anymore, therefore the signal interval isn't required anymore.
GuptaManan100 marked this conversation as resolved.
Show resolved Hide resolved


#### <a id="deprecated-stats"/>Deprecated Stats

These stats are deprecated in v17.
Expand Down
3 changes: 1 addition & 2 deletions go/flags/endtoend/vttablet.txt
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,6 @@ Usage of vttablet:
--queryserver-config-query-pool-waiter-cap int query server query pool waiter limit, this is the maximum number of queries that can be queued waiting to get a connection (default 5000)
--queryserver-config-query-timeout duration query server query timeout (in seconds), this is the query timeout in vttablet side. If a query takes more than this timeout, it will be killed. (default 30s)
--queryserver-config-schema-change-signal query server schema signal, will signal connected vtgates that schema has changed whenever this is detected. VTGates will need to have -schema_change_signal enabled for this to work (default true)
--queryserver-config-schema-change-signal-interval duration query server schema change signal interval defines at which interval the query server shall send schema updates to vtgate. (default 5s)
--queryserver-config-schema-reload-time duration query server schema reload time, how often vttablet reloads schemas from underlying MySQL instance in seconds. vttablet keeps table schemas in its own memory and periodically refreshes it from MySQL. This config controls the reload time. (default 30m0s)
--queryserver-config-stream-buffer-size int query server stream buffer size, the maximum number of bytes sent from vttablet for each stream call. It's recommended to keep this value in sync with vtgate's stream_buffer_size. (default 32768)
--queryserver-config-stream-pool-size int query server stream connection pool size, stream pool is used by stream queries: queries that return results to client in a streaming fashion (default 200)
Expand Down Expand Up @@ -269,7 +268,7 @@ Usage of vttablet:
--s3_backup_storage_root string root prefix for all backup-related object names.
--s3_backup_tls_skip_verify_cert skip the 'certificate is valid' check for SSL connections.
--sanitize_log_messages Remove potentially sensitive information in tablet INFO, WARNING, and ERROR log messages such as query parameters.
--schema-change-reload-timeout duration query server schema change signal reload timeout, this is how long to wait for the signaled schema reload operation to complete before giving up (default 30s)
--schema-change-reload-timeout duration query server schema change reload timeout, this is how long to wait for the signaled schema reload operation to complete before giving up (default 30s)
--schema-version-max-age-seconds int max age of schema version records to kept in memory by the vreplication historian
--security_policy string the name of a registered security policy to use for controlling access to URLs - empty means allow all for anyone (built-in policies: deny-all, read-only)
--service_map strings comma separated list of services to enable (or disable if prefixed with '-') Example: grpc-queryservice
Expand Down
23 changes: 21 additions & 2 deletions go/mysql/fakesqldb/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,10 @@ type DB struct {
// if fakesqldb is asked to serve queries or query patterns that it has not been explicitly told about it will
// error out by default. However if you set this flag then any unmatched query results in an empty result
neverFail atomic.Bool

// lastError stores the last error in returning a query result.
lastErrorMu sync.Mutex
lastError error
}

// QueryHandler is the interface used by the DB to simulate executed queries
Expand Down Expand Up @@ -176,6 +180,7 @@ func New(t testing.TB) *DB {
connections: make(map[uint32]*mysql.Conn),
queryPatternUserCallback: make(map[*regexp.Regexp]func(string)),
patternData: make(map[string]exprResult),
lastErrorMu: sync.Mutex{},
}

db.Handler = db
Expand Down Expand Up @@ -245,6 +250,13 @@ func (db *DB) CloseAllConnections() {
}
}

// LastError gives the last error the DB ran into
func (db *DB) LastError() error {
db.lastErrorMu.Lock()
defer db.lastErrorMu.Unlock()
return db.lastError
}

// WaitForClose should be used after CloseAllConnections() is closed and
// you want to provoke a MySQL client error with errno 2006.
//
Expand Down Expand Up @@ -342,7 +354,14 @@ func (db *DB) WarningCount(c *mysql.Conn) uint16 {
}

// HandleQuery is the default implementation of the QueryHandler interface
func (db *DB) HandleQuery(c *mysql.Conn, query string, callback func(*sqltypes.Result) error) error {
func (db *DB) HandleQuery(c *mysql.Conn, query string, callback func(*sqltypes.Result) error) (err error) {
defer func() {
if err != nil {
db.lastErrorMu.Lock()
db.lastError = err
db.lastErrorMu.Unlock()
}
}()
if db.allowAll.Load() {
return callback(&sqltypes.Result{})
}
Expand Down Expand Up @@ -413,7 +432,7 @@ func (db *DB) HandleQuery(c *mysql.Conn, query string, callback func(*sqltypes.R
return callback(&sqltypes.Result{})
}
// Nothing matched.
err := fmt.Errorf("fakesqldb:: query: '%s' is not supported on %v",
err = fmt.Errorf("fakesqldb:: query: '%s' is not supported on %v",
sqlparser.TruncateForUI(query), db.name)
log.Errorf("Query not found: %s", sqlparser.TruncateForUI(query))

Expand Down
40 changes: 40 additions & 0 deletions go/test/endtoend/cluster/cluster_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -929,6 +929,46 @@ func (cluster *LocalProcessCluster) StreamTabletHealth(ctx context.Context, vtta
return responses, nil
}

// StreamTabletHealthUntil invokes a HealthStream on a local cluster Vttablet and
// returns the responses. It waits until a certain condition is met. The amount of time to wait is an input that it takes.
func (cluster *LocalProcessCluster) StreamTabletHealthUntil(ctx context.Context, vttablet *Vttablet, timeout time.Duration, condition func(shr *querypb.StreamHealthResponse) bool) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are existing usages of StreamTabletHealth can those also use this utility function?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is an inherent difference between the two. StreamTabletHealth gets you specified number of stream responses back, which can then be processed. On the other hand StreamTabletHealthUntil is meant to be used where the test doesn't care if we consume more responses and only verifies if a certain check is ever true.
The tests currently using StreamTabletHealth actually can't really use StreamTabletHealthUntil because they want to run a test on the next packet, and not an eventual packet.

tablet, err := cluster.VtctlclientGetTablet(vttablet)
if err != nil {
return err
}

conn, err := tabletconn.GetDialer()(tablet, grpcclient.FailFast(false))
if err != nil {
return err
}

conditionSuccess := false
timeoutExceeded := false
go func() {
time.Sleep(timeout)
timeoutExceeded = true
}()

err = conn.StreamHealth(ctx, func(shr *querypb.StreamHealthResponse) error {
if condition(shr) {
conditionSuccess = true
}
if timeoutExceeded || conditionSuccess {
return io.EOF
}
return nil
})

if conditionSuccess {
return nil
}

if timeoutExceeded {
return errors.New("timeout exceed while waiting for the condition in StreamHealth")
}
return err
}

func (cluster *LocalProcessCluster) VtctlclientGetTablet(tablet *Vttablet) (*topodatapb.Tablet, error) {
result, err := cluster.VtctlclientProcess.ExecuteCommandWithOutput("GetTablet", "--", tablet.Alias)
if err != nil {
Expand Down
12 changes: 12 additions & 0 deletions go/test/endtoend/cluster/vtctldclient_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,18 @@ func VtctldClientProcessInstance(hostname string, grpcPort int, tmpDirectory str
return vtctldclient
}

// PlannedReparentShard executes vtctlclient command to make specified tablet the primary for the shard.
func (vtctldclient *VtctldClientProcess) PlannedReparentShard(Keyspace string, Shard string, alias string) (err error) {
output, err := vtctldclient.ExecuteCommandWithOutput(
"PlannedReparentShard",
fmt.Sprintf("%s/%s", Keyspace, Shard),
"--new-primary", alias)
if err != nil {
log.Errorf("error in PlannedReparentShard output %s, err %s", output, err.Error())
Comment on lines +97 to +103
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are other places as well we are doing this call, like InitializeShard can it be reused

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could, but eventually we want to move to using vtctldclient and not vtctlclient. So I thought it prudent to add a PRS call to vtctldclient. This is what we should be using going forward in all our tests.

}
return err
}

// CreateKeyspace executes the vtctl command to create a keyspace
func (vtctldclient *VtctldClientProcess) CreateKeyspace(keyspaceName string, sidecarDBName string) (err error) {
var output string
Expand Down
2 changes: 1 addition & 1 deletion go/test/endtoend/tabletmanager/custom_rule_topo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,5 +119,5 @@ func TestTopoCustomRule(t *testing.T) {
// Reset the VtTabletExtraArgs
clusterInstance.VtTabletExtraArgs = []string{}
// Tear down custom processes
killTablets(t, rTablet)
killTablets(rTablet)
}
1 change: 1 addition & 0 deletions go/test/endtoend/tabletmanager/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ func TestMain(m *testing.M) {
// List of users authorized to execute vschema ddl operations
clusterInstance.VtGateExtraArgs = []string{
"--vschema_ddl_authorized_users=%",
"--enable-views",
"--discovery_low_replication_lag", tabletUnhealthyThreshold.String(),
}
// Set extra tablet args for lock timeout
Expand Down
147 changes: 143 additions & 4 deletions go/test/endtoend/tabletmanager/tablet_health_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"k8s.io/utils/strings/slices"

"vitess.io/vitess/go/json2"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/test/endtoend/utils"

querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)
Expand Down Expand Up @@ -87,13 +87,15 @@ func TestTabletReshuffle(t *testing.T) {
err = clusterInstance.VtctlclientProcess.ExecuteCommand("Backup", rTablet.Alias)
assert.Error(t, err, "cannot perform backup without my.cnf")

killTablets(t, rTablet)
killTablets(rTablet)
}

func TestHealthCheck(t *testing.T) {
// Add one replica that starts not initialized
defer cluster.PanicHandler(t)
ctx := context.Background()
clusterInstance.DisableVTOrcRecoveries(t)
defer clusterInstance.EnableVTOrcRecoveries(t)

rTablet := clusterInstance.NewVttabletInstance("replica", 0, "")

Expand Down Expand Up @@ -192,7 +194,141 @@ func TestHealthCheck(t *testing.T) {
}

// Manual cleanup of processes
killTablets(t, rTablet)
killTablets(rTablet)
}

// TestHealthCheckSchemaChangeSignal tests the tables and views, which report their schemas have changed in the output of a StreamHealth.
func TestHealthCheckSchemaChangeSignal(t *testing.T) {
GuptaManan100 marked this conversation as resolved.
Show resolved Hide resolved
// Add one replica that starts not initialized
defer cluster.PanicHandler(t)
ctx := context.Background()

vtParams := clusterInstance.GetVTParams(keyspaceName)
conn, err := mysql.Connect(ctx, &vtParams)
require.NoError(t, err)
defer conn.Close()

// Make sure the primary is the primary when the test starts.
// This state should be ensured before we actually test anything.
checkTabletType(t, primaryTablet.Alias, "PRIMARY")

// Run a bunch of DDL queries and verify that the tables/views changed show up in the health stream.
// These tests are for the part where `--queryserver-enable-views` flag is not set.
verifyHealthStreamSchemaChangeSignals(t, conn, &primaryTablet, false)

// We start a new vttablet, this time with `--queryserver-enable-views` flag specified.
tempTablet := clusterInstance.NewVttabletInstance("replica", 0, "")
// Start Mysql Processes and return connection
_, err = cluster.StartMySQLAndGetConnection(ctx, tempTablet, username, clusterInstance.TmpDirectory)
require.NoError(t, err)
oldArgs := clusterInstance.VtTabletExtraArgs
clusterInstance.VtTabletExtraArgs = append(clusterInstance.VtTabletExtraArgs, "--queryserver-enable-views")
defer func() {
clusterInstance.VtTabletExtraArgs = oldArgs
}()
// start vttablet process, should be in SERVING state as we already have a primary.
err = clusterInstance.StartVttablet(tempTablet, "SERVING", false, cell, keyspaceName, hostname, shardName)
require.NoError(t, err)

defer func() {
// Restore the primary tablet back to the original.
err = clusterInstance.VtctldClientProcess.PlannedReparentShard(keyspaceName, shardName, primaryTablet.Alias)
require.NoError(t, err)
// Manual cleanup of processes
killTablets(tempTablet)
}()

// Now we reparent the cluster to the new tablet we have.
err = clusterInstance.VtctldClientProcess.PlannedReparentShard(keyspaceName, shardName, tempTablet.Alias)
require.NoError(t, err)

checkTabletType(t, tempTablet.Alias, "PRIMARY")
// Run a bunch of DDL queries and verify that the tables/views changed show up in the health stream.
// These tests are for the part where `--queryserver-enable-views` flag is set.
verifyHealthStreamSchemaChangeSignals(t, conn, tempTablet, true)
}

func verifyHealthStreamSchemaChangeSignals(t *testing.T, vtgateConn *mysql.Conn, primaryTablet *cluster.Vttablet, viewsEnabled bool) {
var streamErr error
wg := sync.WaitGroup{}
wg.Add(1)
ranOnce := false
finished := false
ch := make(chan *querypb.StreamHealthResponse)
go func() {
defer wg.Done()
streamErr = clusterInstance.StreamTabletHealthUntil(context.Background(), primaryTablet, 30*time.Second, func(shr *querypb.StreamHealthResponse) bool {
ranOnce = true
// If we are finished, then close the channel and end the stream.
if finished {
close(ch)
return true
}
// Put the response in the channel.
ch <- shr
return false
})
}()
// The test becomes flaky if we run the DDL immediately after starting the above go routine because the client for the Stream
// sometimes isn't registered by the time DDL runs, and it misses the update we get. To prevent this situation, we wait for one Stream packet
// to have returned. Once we know we received a Stream packet, then we know that we are registered for the health stream and can execute the DDL.
for i := 0; i < 30; i++ {
if ranOnce {
break
}
time.Sleep(1 * time.Second)
}

verifyTableDDLSchemaChangeSignal(t, vtgateConn, ch, "CREATE TABLE `area` (`id` int NOT NULL, `country` varchar(30), PRIMARY KEY (`id`))", "area")
verifyTableDDLSchemaChangeSignal(t, vtgateConn, ch, "CREATE TABLE `area2` (`id` int NOT NULL, PRIMARY KEY (`id`))", "area2")
verifyViewDDLSchemaChangeSignal(t, vtgateConn, ch, "CREATE VIEW v2 as select * from t1", viewsEnabled)
verifyTableDDLSchemaChangeSignal(t, vtgateConn, ch, "ALTER TABLE `area` ADD COLUMN name varchar(30) NOT NULL", "area")
verifyTableDDLSchemaChangeSignal(t, vtgateConn, ch, "DROP TABLE `area2`", "area2")
verifyViewDDLSchemaChangeSignal(t, vtgateConn, ch, "ALTER VIEW v2 as select id from t1", viewsEnabled)
verifyViewDDLSchemaChangeSignal(t, vtgateConn, ch, "DROP VIEW v2", viewsEnabled)
verifyTableDDLSchemaChangeSignal(t, vtgateConn, ch, "DROP TABLE `area`", "area")

finished = true
wg.Wait()
require.NoError(t, streamErr)
}

func verifyTableDDLSchemaChangeSignal(t *testing.T, vtgateConn *mysql.Conn, ch chan *querypb.StreamHealthResponse, query string, table string) {
_, err := vtgateConn.ExecuteFetch(query, 10000, false)
require.NoError(t, err)

timeout := time.After(15 * time.Second)
for {
select {
case shr := <-ch:
if shr != nil && shr.RealtimeStats != nil && slices.Contains(shr.RealtimeStats.TableSchemaChanged, table) {
return
}
case <-timeout:
t.Errorf("didn't get the correct tables changed in stream response until timeout")
}
}
}

func verifyViewDDLSchemaChangeSignal(t *testing.T, vtgateConn *mysql.Conn, ch chan *querypb.StreamHealthResponse, query string, viewsEnabled bool) {
_, err := vtgateConn.ExecuteFetch(query, 10000, false)
require.NoError(t, err)

timeout := time.After(15 * time.Second)
for {
select {
case shr := <-ch:
listToUse := shr.RealtimeStats.TableSchemaChanged
if viewsEnabled {
listToUse = shr.RealtimeStats.ViewSchemaChanged
}
if shr != nil && shr.RealtimeStats != nil && slices.Contains(listToUse, "v2") {
return
}
case <-timeout:
t.Errorf("didn't get the correct views changed in stream response until timeout")
}
}
}

func checkHealth(t *testing.T, port int, shouldError bool) {
Expand Down Expand Up @@ -247,6 +383,8 @@ func TestHealthCheckDrainedStateDoesNotShutdownQueryService(t *testing.T) {

//Wait if tablet is not in service state
defer cluster.PanicHandler(t)
clusterInstance.DisableVTOrcRecoveries(t)
defer clusterInstance.EnableVTOrcRecoveries(t)
err := rdonlyTablet.VttabletProcess.WaitForTabletStatus("SERVING")
require.NoError(t, err)

Expand Down Expand Up @@ -284,14 +422,15 @@ func TestHealthCheckDrainedStateDoesNotShutdownQueryService(t *testing.T) {
checkHealth(t, rdonlyTablet.HTTPPort, false)
}

func killTablets(t *testing.T, tablets ...*cluster.Vttablet) {
func killTablets(tablets ...*cluster.Vttablet) {
var wg sync.WaitGroup
for _, tablet := range tablets {
wg.Add(1)
go func(tablet *cluster.Vttablet) {
defer wg.Done()
_ = tablet.VttabletProcess.TearDown()
_ = tablet.MysqlctlProcess.Stop()
_ = clusterInstance.VtctlclientProcess.ExecuteCommand("DeleteTablet", tablet.Alias)
}(tablet)
}
wg.Wait()
Expand Down
Loading