From a1249d1dbd3607dbc293527772b666497c0bf8b2 Mon Sep 17 00:00:00 2001 From: Olga Shestopalova Date: Fri, 6 Oct 2023 08:18:34 -0400 Subject: [PATCH 1/5] set vreplication net read and net write timeout session vars to high values Signed-off-by: Olga Shestopalova --- go/vt/vttablet/tabletmanager/vreplication/controller.go | 6 ++++++ go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go | 6 ++++++ go/vt/vttablet/tabletserver/vstreamer/tablestreamer.go | 6 ++++++ 3 files changed, 18 insertions(+) diff --git a/go/vt/vttablet/tabletmanager/vreplication/controller.go b/go/vt/vttablet/tabletmanager/vreplication/controller.go index 94e4741eeee..a431a35cd60 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/controller.go +++ b/go/vt/vttablet/tabletmanager/vreplication/controller.go @@ -227,6 +227,12 @@ func (ct *controller) runBlp(ctx context.Context) (err error) { if _, err := dbClient.ExecuteFetch("set names 'binary'", 10000); err != nil { return err } + if _, err := dbClient.ExecuteFetch("set @@session.net_read_timeout = 300", 10000); err != nil { + return err + } + if _, err := dbClient.ExecuteFetch("set @@session.net_write_timeout = 600", 10000); err != nil { + return err + } // We must apply AUTO_INCREMENT values precisely as we got them. This include the 0 value, which is not recommended in AUTO_INCREMENT, and yet is valid. if _, err := dbClient.ExecuteFetch("set @@session.sql_mode = CONCAT(@@session.sql_mode, ',NO_AUTO_VALUE_ON_ZERO')", 10000); err != nil { return err diff --git a/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go index 25a5cd7bf87..b1a25929145 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go @@ -137,6 +137,12 @@ func (rs *rowStreamer) Stream() error { if _, err := rs.conn.ExecuteFetch("set names 'binary'", 1, false); err != nil { return err } + if _, err := conn.ExecuteFetch("set @@session.net_read_timeout = 300", 1, false); err != nil { + return err + } + if _, err := conn.ExecuteFetch("set @@session.net_write_timeout = 600", 1, false); err != nil { + return err + } } return rs.streamQuery(rs.send) } diff --git a/go/vt/vttablet/tabletserver/vstreamer/tablestreamer.go b/go/vt/vttablet/tabletserver/vstreamer/tablestreamer.go index 1039f21b4d6..d46519dbd3d 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/tablestreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/tablestreamer.go @@ -108,6 +108,12 @@ func (ts *tableStreamer) Stream() error { if _, err := conn.ExecuteFetch("set names 'binary'", 1, false); err != nil { return err } + if _, err := conn.ExecuteFetch("set @@session.net_read_timeout = 300", 1, false); err != nil { + return err + } + if _, err := conn.ExecuteFetch("set @@session.net_write_timeout = 600", 1, false); err != nil { + return err + } rs, err := conn.ExecuteFetch("show tables", -1, true) if err != nil { From 597651a55f69ea94dbb95e8dd37c262496fd8aaa Mon Sep 17 00:00:00 2001 From: Olga Shestopalova Date: Fri, 6 Oct 2023 08:46:54 -0400 Subject: [PATCH 2/5] fix vreplication test Signed-off-by: Olga Shestopalova --- go/vt/vttablet/tabletmanager/rpc_vreplication_test.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go b/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go index ce7e85ac495..069f5977fe9 100644 --- a/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go +++ b/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go @@ -60,6 +60,8 @@ const ( getAutoIncrementStep = "select @@session.auto_increment_increment" setSessionTZ = "set @@session.time_zone = '+00:00'" setNames = "set names 'binary'" + setNetReadTimeout = "set @@session.net_read_timeout = 300" + setNetWriteTimeout = "set @@session.net_write_timeout = 600" getBinlogRowImage = "select @@binlog_row_image" insertStreamsCreatedLog = "insert into _vt.vreplication_log(vrepl_id, type, state, message) values(1, 'Stream Created', '', '%s'" getVReplicationRecord = "select * from _vt.vreplication where id = 1" @@ -324,6 +326,8 @@ func TestMoveTables(t *testing.T) { ftc.vrdbClient.ExpectRequest(fmt.Sprintf(updatePickedSourceTablet, tenv.cells[0], sourceTabletUID), &sqltypes.Result{}, nil) ftc.vrdbClient.ExpectRequest(setSessionTZ, &sqltypes.Result{}, nil) ftc.vrdbClient.ExpectRequest(setNames, &sqltypes.Result{}, nil) + ftc.vrdbClient.ExpectRequest(setNetReadTimeout, &sqltypes.Result{}, nil) + ftc.vrdbClient.ExpectRequest(setNetWriteTimeout, &sqltypes.Result{}, nil) ftc.vrdbClient.ExpectRequest(getRowsCopied, sqltypes.MakeTestResult( sqltypes.MakeTestFields( @@ -895,6 +899,8 @@ func TestFailedMoveTablesCreateCleanup(t *testing.T) { &sqltypes.Result{}, nil) targetTablet.vrdbClient.ExpectRequest(setSessionTZ, &sqltypes.Result{}, nil) targetTablet.vrdbClient.ExpectRequest(setNames, &sqltypes.Result{}, nil) + targetTablet.vrdbClient.ExpectRequest(setNetReadTimeout, &sqltypes.Result{}, nil) + targetTablet.vrdbClient.ExpectRequest(setNetWriteTimeout, &sqltypes.Result{}, nil) targetTablet.vrdbClient.ExpectRequest(getRowsCopied, sqltypes.MakeTestResult( sqltypes.MakeTestFields( From c6aba00ac9aa9d6d1ce387a1123412806ca3970c Mon Sep 17 00:00:00 2001 From: Olga Shestopalova Date: Tue, 10 Oct 2023 15:11:44 -0400 Subject: [PATCH 3/5] make net read/write timeout values configurable Signed-off-by: Olga Shestopalova --- go/flags/endtoend/vtcombo.txt | 2 ++ go/flags/endtoend/vttablet.txt | 2 ++ go/vt/vttablet/flags.go | 9 +++++++-- go/vt/vttablet/tabletmanager/rpc_vreplication_test.go | 7 ++++--- go/vt/vttablet/tabletmanager/vreplication/controller.go | 5 +++-- go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go | 5 +++-- go/vt/vttablet/tabletserver/vstreamer/tablestreamer.go | 5 +++-- 7 files changed, 24 insertions(+), 11 deletions(-) diff --git a/go/flags/endtoend/vtcombo.txt b/go/flags/endtoend/vtcombo.txt index 4ed61f95080..f452b2844da 100644 --- a/go/flags/endtoend/vtcombo.txt +++ b/go/flags/endtoend/vtcombo.txt @@ -405,6 +405,8 @@ Flags: --vreplication_healthcheck_topology_refresh duration refresh interval for re-reading the topology (default 30s) --vreplication_heartbeat_update_interval int Frequency (in seconds, default 1, max 60) at which the time_updated column of a vreplication stream when idling (default 1) --vreplication_max_time_to_retry_on_error duration stop automatically retrying when we've had consecutive failures with the same error for this long after the first occurrence + --vreplication_net_read_timeout int Session value of net_read_timeout for vreplication, in seconds (default 300) + --vreplication_net_write_timeout int Session value of net_read_timeout for vreplication, in seconds (default 600) --vreplication_replica_lag_tolerance duration Replica lag threshold duration: once lag is below this we switch from copy phase to the replication (streaming) phase (default 1m0s) --vreplication_retry_delay duration delay before retrying a failed workflow event in the replication phase (default 5s) --vreplication_store_compressed_gtid Store compressed gtids in the pos column of the sidecar database's vreplication table diff --git a/go/flags/endtoend/vttablet.txt b/go/flags/endtoend/vttablet.txt index 3b3f5142c97..61b315f923a 100644 --- a/go/flags/endtoend/vttablet.txt +++ b/go/flags/endtoend/vttablet.txt @@ -411,6 +411,8 @@ Flags: --vreplication_healthcheck_topology_refresh duration refresh interval for re-reading the topology (default 30s) --vreplication_heartbeat_update_interval int Frequency (in seconds, default 1, max 60) at which the time_updated column of a vreplication stream when idling (default 1) --vreplication_max_time_to_retry_on_error duration stop automatically retrying when we've had consecutive failures with the same error for this long after the first occurrence + --vreplication_net_read_timeout int Session value of net_read_timeout for vreplication, in seconds (default 300) + --vreplication_net_write_timeout int Session value of net_read_timeout for vreplication, in seconds (default 600) --vreplication_replica_lag_tolerance duration Replica lag threshold duration: once lag is below this we switch from copy phase to the replication (streaming) phase (default 1m0s) --vreplication_retry_delay duration delay before retrying a failed workflow event in the replication phase (default 5s) --vreplication_store_compressed_gtid Store compressed gtids in the pos column of the sidecar database's vreplication table diff --git a/go/vt/vttablet/flags.go b/go/vt/vttablet/flags.go index 460a5427358..a5d7d61747e 100644 --- a/go/vt/vttablet/flags.go +++ b/go/vt/vttablet/flags.go @@ -18,7 +18,6 @@ package vttablet import ( "github.com/spf13/pflag" - "vitess.io/vitess/go/vt/servenv" ) @@ -27,7 +26,11 @@ const ( VReplicationExperimentalFlagAllowNoBlobBinlogRowImage = int64(2) ) -var VReplicationExperimentalFlags = VReplicationExperimentalFlagOptimizeInserts | VReplicationExperimentalFlagAllowNoBlobBinlogRowImage +var ( + VReplicationExperimentalFlags = VReplicationExperimentalFlagOptimizeInserts | VReplicationExperimentalFlagAllowNoBlobBinlogRowImage + VReplicationNetReadTimeout = 300 + VReplicationNetWriteTimeout = 600 +) func init() { servenv.OnParseFor("vttablet", registerFlags) @@ -36,4 +39,6 @@ func init() { func registerFlags(fs *pflag.FlagSet) { fs.Int64Var(&VReplicationExperimentalFlags, "vreplication_experimental_flags", VReplicationExperimentalFlags, "(Bitmask) of experimental features in vreplication to enable") + fs.IntVar(&VReplicationNetReadTimeout, "vreplication_net_read_timeout", VReplicationNetReadTimeout, "Session value of net_read_timeout for vreplication, in seconds") + fs.IntVar(&VReplicationNetWriteTimeout, "vreplication_net_write_timeout", VReplicationNetWriteTimeout, "Session value of net_write_timeout for vreplication, in seconds") } diff --git a/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go b/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go index 069f5977fe9..f4df9e2a31c 100644 --- a/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go +++ b/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go @@ -24,6 +24,7 @@ import ( "runtime/debug" "strings" "testing" + "vitess.io/vitess/go/vt/vttablet" "github.com/stretchr/testify/require" @@ -60,8 +61,6 @@ const ( getAutoIncrementStep = "select @@session.auto_increment_increment" setSessionTZ = "set @@session.time_zone = '+00:00'" setNames = "set names 'binary'" - setNetReadTimeout = "set @@session.net_read_timeout = 300" - setNetWriteTimeout = "set @@session.net_write_timeout = 600" getBinlogRowImage = "select @@binlog_row_image" insertStreamsCreatedLog = "insert into _vt.vreplication_log(vrepl_id, type, state, message) values(1, 'Stream Created', '', '%s'" getVReplicationRecord = "select * from _vt.vreplication where id = 1" @@ -86,7 +85,9 @@ var ( }, }, } - position = fmt.Sprintf("%s/%s", gtidFlavor, gtidPosition) + position = fmt.Sprintf("%s/%s", gtidFlavor, gtidPosition) + setNetReadTimeout = fmt.Sprintf("set @@session.net_read_timeout = %v", vttablet.VReplicationNetReadTimeout) + setNetWriteTimeout = fmt.Sprintf("set @@session.net_write_timeout = %v", vttablet.VReplicationNetWriteTimeout) ) // TestCreateVReplicationWorkflow tests the query generated diff --git a/go/vt/vttablet/tabletmanager/vreplication/controller.go b/go/vt/vttablet/tabletmanager/vreplication/controller.go index a431a35cd60..700b25e202f 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/controller.go +++ b/go/vt/vttablet/tabletmanager/vreplication/controller.go @@ -23,6 +23,7 @@ import ( "strings" "sync/atomic" "time" + "vitess.io/vitess/go/vt/vttablet" "google.golang.org/protobuf/encoding/prototext" @@ -227,10 +228,10 @@ func (ct *controller) runBlp(ctx context.Context) (err error) { if _, err := dbClient.ExecuteFetch("set names 'binary'", 10000); err != nil { return err } - if _, err := dbClient.ExecuteFetch("set @@session.net_read_timeout = 300", 10000); err != nil { + if _, err := dbClient.ExecuteFetch(fmt.Sprintf("set @@session.net_read_timeout = %v", vttablet.VReplicationNetReadTimeout), 10000); err != nil { return err } - if _, err := dbClient.ExecuteFetch("set @@session.net_write_timeout = 600", 10000); err != nil { + if _, err := dbClient.ExecuteFetch(fmt.Sprintf("set @@session.net_write_timeout = %v", vttablet.VReplicationNetWriteTimeout), 10000); err != nil { return err } // We must apply AUTO_INCREMENT values precisely as we got them. This include the 0 value, which is not recommended in AUTO_INCREMENT, and yet is valid. diff --git a/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go index b1a25929145..566c273d3cb 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go @@ -21,6 +21,7 @@ import ( "fmt" "sync" "time" + "vitess.io/vitess/go/vt/vttablet" "vitess.io/vitess/go/mysql/collations" "vitess.io/vitess/go/mysql/replication" @@ -137,10 +138,10 @@ func (rs *rowStreamer) Stream() error { if _, err := rs.conn.ExecuteFetch("set names 'binary'", 1, false); err != nil { return err } - if _, err := conn.ExecuteFetch("set @@session.net_read_timeout = 300", 1, false); err != nil { + if _, err := conn.ExecuteFetch(fmt.Sprintf("set @@session.net_read_timeout = %v", vttablet.VReplicationNetReadTimeout), 1, false); err != nil { return err } - if _, err := conn.ExecuteFetch("set @@session.net_write_timeout = 600", 1, false); err != nil { + if _, err := conn.ExecuteFetch(fmt.Sprintf("set @@session.net_write_timeout = %v", vttablet.VReplicationNetWriteTimeout), 1, false); err != nil { return err } } diff --git a/go/vt/vttablet/tabletserver/vstreamer/tablestreamer.go b/go/vt/vttablet/tabletserver/vstreamer/tablestreamer.go index d46519dbd3d..9c01ac12c93 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/tablestreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/tablestreamer.go @@ -22,6 +22,7 @@ import ( "fmt" "strings" "sync/atomic" + "vitess.io/vitess/go/vt/vttablet" "vitess.io/vitess/go/sqlescape" "vitess.io/vitess/go/sqltypes" @@ -108,10 +109,10 @@ func (ts *tableStreamer) Stream() error { if _, err := conn.ExecuteFetch("set names 'binary'", 1, false); err != nil { return err } - if _, err := conn.ExecuteFetch("set @@session.net_read_timeout = 300", 1, false); err != nil { + if _, err := conn.ExecuteFetch(fmt.Sprintf("set @@session.net_read_timeout = %v", vttablet.VReplicationNetReadTimeout), 1, false); err != nil { return err } - if _, err := conn.ExecuteFetch("set @@session.net_write_timeout = 600", 1, false); err != nil { + if _, err := conn.ExecuteFetch(fmt.Sprintf("set @@session.net_write_timeout = %v", vttablet.VReplicationNetWriteTimeout), 1, false); err != nil { return err } From 2cf25f8da3e76d614a11e9dd2e1c547147c992db Mon Sep 17 00:00:00 2001 From: Olga Shestopalova Date: Tue, 10 Oct 2023 15:33:57 -0400 Subject: [PATCH 4/5] fix imports Signed-off-by: Olga Shestopalova --- go/vt/vttablet/flags.go | 1 + go/vt/vttablet/tabletmanager/rpc_vreplication_test.go | 1 + go/vt/vttablet/tabletmanager/vreplication/controller.go | 1 + go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go | 1 + go/vt/vttablet/tabletserver/vstreamer/tablestreamer.go | 1 + 5 files changed, 5 insertions(+) diff --git a/go/vt/vttablet/flags.go b/go/vt/vttablet/flags.go index a5d7d61747e..87453e2ebcd 100644 --- a/go/vt/vttablet/flags.go +++ b/go/vt/vttablet/flags.go @@ -18,6 +18,7 @@ package vttablet import ( "github.com/spf13/pflag" + "vitess.io/vitess/go/vt/servenv" ) diff --git a/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go b/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go index f4df9e2a31c..98731ca2a14 100644 --- a/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go +++ b/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go @@ -24,6 +24,7 @@ import ( "runtime/debug" "strings" "testing" + "vitess.io/vitess/go/vt/vttablet" "github.com/stretchr/testify/require" diff --git a/go/vt/vttablet/tabletmanager/vreplication/controller.go b/go/vt/vttablet/tabletmanager/vreplication/controller.go index 700b25e202f..3262dca9bc6 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/controller.go +++ b/go/vt/vttablet/tabletmanager/vreplication/controller.go @@ -23,6 +23,7 @@ import ( "strings" "sync/atomic" "time" + "vitess.io/vitess/go/vt/vttablet" "google.golang.org/protobuf/encoding/prototext" diff --git a/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go index 566c273d3cb..f595f680d4d 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go @@ -21,6 +21,7 @@ import ( "fmt" "sync" "time" + "vitess.io/vitess/go/vt/vttablet" "vitess.io/vitess/go/mysql/collations" diff --git a/go/vt/vttablet/tabletserver/vstreamer/tablestreamer.go b/go/vt/vttablet/tabletserver/vstreamer/tablestreamer.go index 9c01ac12c93..0bbd265435b 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/tablestreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/tablestreamer.go @@ -22,6 +22,7 @@ import ( "fmt" "strings" "sync/atomic" + "vitess.io/vitess/go/vt/vttablet" "vitess.io/vitess/go/sqlescape" From fc19d19708816bafdde90713d753003e983eea3a Mon Sep 17 00:00:00 2001 From: Olga Shestopalova Date: Tue, 10 Oct 2023 16:09:46 -0400 Subject: [PATCH 5/5] fix flags Signed-off-by: Olga Shestopalova --- go/flags/endtoend/vtcombo.txt | 2 -- go/flags/endtoend/vttablet.txt | 2 +- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/go/flags/endtoend/vtcombo.txt b/go/flags/endtoend/vtcombo.txt index f452b2844da..4ed61f95080 100644 --- a/go/flags/endtoend/vtcombo.txt +++ b/go/flags/endtoend/vtcombo.txt @@ -405,8 +405,6 @@ Flags: --vreplication_healthcheck_topology_refresh duration refresh interval for re-reading the topology (default 30s) --vreplication_heartbeat_update_interval int Frequency (in seconds, default 1, max 60) at which the time_updated column of a vreplication stream when idling (default 1) --vreplication_max_time_to_retry_on_error duration stop automatically retrying when we've had consecutive failures with the same error for this long after the first occurrence - --vreplication_net_read_timeout int Session value of net_read_timeout for vreplication, in seconds (default 300) - --vreplication_net_write_timeout int Session value of net_read_timeout for vreplication, in seconds (default 600) --vreplication_replica_lag_tolerance duration Replica lag threshold duration: once lag is below this we switch from copy phase to the replication (streaming) phase (default 1m0s) --vreplication_retry_delay duration delay before retrying a failed workflow event in the replication phase (default 5s) --vreplication_store_compressed_gtid Store compressed gtids in the pos column of the sidecar database's vreplication table diff --git a/go/flags/endtoend/vttablet.txt b/go/flags/endtoend/vttablet.txt index 61b315f923a..30fe5e41172 100644 --- a/go/flags/endtoend/vttablet.txt +++ b/go/flags/endtoend/vttablet.txt @@ -412,7 +412,7 @@ Flags: --vreplication_heartbeat_update_interval int Frequency (in seconds, default 1, max 60) at which the time_updated column of a vreplication stream when idling (default 1) --vreplication_max_time_to_retry_on_error duration stop automatically retrying when we've had consecutive failures with the same error for this long after the first occurrence --vreplication_net_read_timeout int Session value of net_read_timeout for vreplication, in seconds (default 300) - --vreplication_net_write_timeout int Session value of net_read_timeout for vreplication, in seconds (default 600) + --vreplication_net_write_timeout int Session value of net_write_timeout for vreplication, in seconds (default 600) --vreplication_replica_lag_tolerance duration Replica lag threshold duration: once lag is below this we switch from copy phase to the replication (streaming) phase (default 1m0s) --vreplication_retry_delay duration delay before retrying a failed workflow event in the replication phase (default 5s) --vreplication_store_compressed_gtid Store compressed gtids in the pos column of the sidecar database's vreplication table