diff --git a/go/flags/endtoend/vttablet.txt b/go/flags/endtoend/vttablet.txt index f4319799061..6d12d638bcf 100644 --- a/go/flags/endtoend/vttablet.txt +++ b/go/flags/endtoend/vttablet.txt @@ -376,6 +376,8 @@ Usage of vttablet: --vreplication_healthcheck_topology_refresh duration refresh interval for re-reading the topology (default 30s) --vreplication_heartbeat_update_interval int Frequency (in seconds, default 1, max 60) at which the time_updated column of a vreplication stream when idling (default 1) --vreplication_max_time_to_retry_on_error duration stop automatically retrying when we've had consecutive failures with the same error for this long after the first occurrence + --vreplication_net_read_timeout int Session value of net_read_timeout for vreplication, in seconds (default 300) + --vreplication_net_write_timeout int Session value of net_write_timeout for vreplication, in seconds (default 600) --vreplication_replica_lag_tolerance duration Replica lag threshold duration: once lag is below this we switch from copy phase to the replication (streaming) phase (default 1m0s) --vreplication_retry_delay duration delay before retrying a failed workflow event in the replication phase (default 5s) --vreplication_store_compressed_gtid Store compressed gtids in the pos column of the sidecar database's vreplication table diff --git a/go/vt/vttablet/flags.go b/go/vt/vttablet/flags.go index 460a5427358..7f00aa8bc66 100644 --- a/go/vt/vttablet/flags.go +++ b/go/vt/vttablet/flags.go @@ -29,6 +29,11 @@ const ( var VReplicationExperimentalFlags = VReplicationExperimentalFlagOptimizeInserts | VReplicationExperimentalFlagAllowNoBlobBinlogRowImage +var ( + VReplicationNetReadTimeout = 300 + VReplicationNetWriteTimeout = 600 +) + func init() { servenv.OnParseFor("vttablet", registerFlags) } @@ -36,4 +41,6 @@ func init() { func registerFlags(fs *pflag.FlagSet) { fs.Int64Var(&VReplicationExperimentalFlags, "vreplication_experimental_flags", VReplicationExperimentalFlags, "(Bitmask) of experimental features in vreplication to enable") + fs.IntVar(&VReplicationNetReadTimeout, "vreplication_net_read_timeout", VReplicationNetReadTimeout, "Session value of net_read_timeout for vreplication, in seconds") + fs.IntVar(&VReplicationNetWriteTimeout, "vreplication_net_write_timeout", VReplicationNetWriteTimeout, "Session value of net_write_timeout for vreplication, in seconds") } diff --git a/go/vt/vttablet/tabletmanager/vreplication/controller.go b/go/vt/vttablet/tabletmanager/vreplication/controller.go index 8f9974a5424..ea1026cdf54 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/controller.go +++ b/go/vt/vttablet/tabletmanager/vreplication/controller.go @@ -24,6 +24,8 @@ import ( "sync/atomic" "time" + "vitess.io/vitess/go/vt/vttablet" + "google.golang.org/protobuf/encoding/prototext" "vitess.io/vitess/go/vt/discovery" @@ -243,6 +245,12 @@ func (ct *controller) runBlp(ctx context.Context) (err error) { if _, err := dbClient.ExecuteFetch("set names 'binary'", 10000); err != nil { return err } + if _, err := dbClient.ExecuteFetch(fmt.Sprintf("set @@session.net_read_timeout = %v", vttablet.VReplicationNetReadTimeout), 10000); err != nil { + return err + } + if _, err := dbClient.ExecuteFetch(fmt.Sprintf("set @@session.net_write_timeout = %v", vttablet.VReplicationNetWriteTimeout), 10000); err != nil { + return err + } // We must apply AUTO_INCREMENT values precisely as we got them. This include the 0 value, which is not recommended in AUTO_INCREMENT, and yet is valid. if _, err := dbClient.ExecuteFetch("set @@session.sql_mode = CONCAT(@@session.sql_mode, ',NO_AUTO_VALUE_ON_ZERO')", 10000); err != nil { return err diff --git a/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go index 31e13427af3..5c083ea746d 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go @@ -23,6 +23,8 @@ import ( "time" "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/vt/vttablet" + "vitess.io/vitess/go/mysql/collations" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/textutil" @@ -119,6 +121,12 @@ func (rs *rowStreamer) Stream() error { if _, err := conn.ExecuteFetch("set names 'binary'", 1, false); err != nil { return err } + if _, err := conn.ExecuteFetch(fmt.Sprintf("set @@session.net_read_timeout = %v", vttablet.VReplicationNetReadTimeout), 1, false); err != nil { + return err + } + if _, err := conn.ExecuteFetch(fmt.Sprintf("set @@session.net_write_timeout = %v", vttablet.VReplicationNetWriteTimeout), 1, false); err != nil { + return err + } return rs.streamQuery(conn, rs.send) }