Skip to content

Commit

Permalink
vreplication timeout query optimizer hints (#13840)
Browse files Browse the repository at this point in the history
Signed-off-by: Olga Shestopalova <[email protected]>
Signed-off-by: Rohit Nayak <[email protected]>
Co-authored-by: Olga Shestopalova <[email protected]>
Co-authored-by: Rohit Nayak <[email protected]>
  • Loading branch information
3 people authored Oct 13, 2023
1 parent 7314dfb commit 0adaf78
Show file tree
Hide file tree
Showing 9 changed files with 61 additions and 47 deletions.
3 changes: 3 additions & 0 deletions go/flags/endtoend/vtcombo.txt
Original file line number Diff line number Diff line change
Expand Up @@ -400,11 +400,14 @@ Flags:
--vreplication_copy_phase_duration duration Duration for each copy phase loop (before running the next catchup: default 1h) (default 1h0m0s)
--vreplication_copy_phase_max_innodb_history_list_length int The maximum InnoDB transaction history that can exist on a vstreamer (source) before starting another round of copying rows. This helps to limit the impact on the source tablet. (default 1000000)
--vreplication_copy_phase_max_mysql_replication_lag int The maximum MySQL replication lag (in seconds) that can exist on a vstreamer (source) before starting another round of copying rows. This helps to limit the impact on the source tablet. (default 43200)
--vreplication_experimental_flags int (Bitmask) of experimental features in vreplication to enable (default 3)
--vreplication_healthcheck_retry_delay duration healthcheck retry delay (default 5s)
--vreplication_healthcheck_timeout duration healthcheck retry delay (default 1m0s)
--vreplication_healthcheck_topology_refresh duration refresh interval for re-reading the topology (default 30s)
--vreplication_heartbeat_update_interval int Frequency (in seconds, default 1, max 60) at which the time_updated column of a vreplication stream when idling (default 1)
--vreplication_max_time_to_retry_on_error duration stop automatically retrying when we've had consecutive failures with the same error for this long after the first occurrence
--vreplication_net_read_timeout int Session value of net_read_timeout for vreplication, in seconds (default 300)
--vreplication_net_write_timeout int Session value of net_write_timeout for vreplication, in seconds (default 600)
--vreplication_replica_lag_tolerance duration Replica lag threshold duration: once lag is below this we switch from copy phase to the replication (streaming) phase (default 1m0s)
--vreplication_retry_delay duration delay before retrying a failed workflow event in the replication phase (default 5s)
--vreplication_store_compressed_gtid Store compressed gtids in the pos column of the sidecar database's vreplication table
Expand Down
6 changes: 6 additions & 0 deletions go/vt/vttablet/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package vttablet

import (
"time"

"github.com/spf13/pflag"

"vitess.io/vitess/go/vt/servenv"
Expand All @@ -31,15 +33,19 @@ var (
VReplicationExperimentalFlags = VReplicationExperimentalFlagOptimizeInserts | VReplicationExperimentalFlagAllowNoBlobBinlogRowImage
VReplicationNetReadTimeout = 300
VReplicationNetWriteTimeout = 600
CopyPhaseDuration = 1 * time.Hour
)

func init() {
servenv.OnParseFor("vttablet", registerFlags)
servenv.OnParseFor("vtcombo", registerFlags)

}

func registerFlags(fs *pflag.FlagSet) {
fs.Int64Var(&VReplicationExperimentalFlags, "vreplication_experimental_flags", VReplicationExperimentalFlags,
"(Bitmask) of experimental features in vreplication to enable")
fs.IntVar(&VReplicationNetReadTimeout, "vreplication_net_read_timeout", VReplicationNetReadTimeout, "Session value of net_read_timeout for vreplication, in seconds")
fs.IntVar(&VReplicationNetWriteTimeout, "vreplication_net_write_timeout", VReplicationNetWriteTimeout, "Session value of net_write_timeout for vreplication, in seconds")
fs.DurationVar(&CopyPhaseDuration, "vreplication_copy_phase_duration", CopyPhaseDuration, "Duration for each copy phase loop (before running the next catchup: default 1h)")
}
2 changes: 0 additions & 2 deletions go/vt/vttablet/tabletmanager/vreplication/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ var (
relayLogMaxSize = 250000
relayLogMaxItems = 5000

copyPhaseDuration = 1 * time.Hour
replicaLagTolerance = 1 * time.Minute

vreplicationHeartbeatUpdateInterval = 1
Expand All @@ -53,7 +52,6 @@ func registerVReplicationFlags(fs *pflag.FlagSet) {
fs.IntVar(&relayLogMaxSize, "relay_log_max_size", relayLogMaxSize, "Maximum buffer size (in bytes) for VReplication target buffering. If single rows are larger than this, a single row is buffered at a time.")
fs.IntVar(&relayLogMaxItems, "relay_log_max_items", relayLogMaxItems, "Maximum number of rows for VReplication target buffering.")

fs.DurationVar(&copyPhaseDuration, "vreplication_copy_phase_duration", copyPhaseDuration, "Duration for each copy phase loop (before running the next catchup: default 1h)")
fs.DurationVar(&replicaLagTolerance, "vreplication_replica_lag_tolerance", replicaLagTolerance, "Replica lag threshold duration: once lag is below this we switch from copy phase to the replication (streaming) phase")

// vreplicationHeartbeatUpdateInterval determines how often the time_updated column is updated if there are no real events on the source and the source
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ package vreplication
import (
"fmt"

"vitess.io/vitess/go/vt/vttablet"

"vitess.io/vitess/go/vt/log"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"

"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vttablet"
)

// isBitSet returns true if the bit at index is set
Expand Down
4 changes: 3 additions & 1 deletion go/vt/vttablet/tabletmanager/vreplication/vcopier.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (

"google.golang.org/protobuf/encoding/prototext"

"vitess.io/vitess/go/vt/vttablet"

"vitess.io/vitess/go/bytes2"
"vitess.io/vitess/go/mysql/replication"
"vitess.io/vitess/go/pools"
Expand Down Expand Up @@ -393,7 +395,7 @@ func (vc *vcopier) copyTable(ctx context.Context, tableName string, copyState ma
return fmt.Errorf("plan not found for table: %s, current plans are: %#v", tableName, plan.TargetTables)
}

ctx, cancel := context.WithTimeout(ctx, copyPhaseDuration)
ctx, cancel := context.WithTimeout(ctx, vttablet.CopyPhaseDuration)
defer cancel()

var lastpkpb *querypb.QueryResult
Expand Down
4 changes: 3 additions & 1 deletion go/vt/vttablet/tabletmanager/vreplication/vcopier_atomic.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"strconv"
"time"

"vitess.io/vitess/go/vt/vttablet"

"google.golang.org/protobuf/encoding/prototext"

"vitess.io/vitess/go/sqltypes"
Expand Down Expand Up @@ -77,7 +79,7 @@ func (vc *vcopier) copyAll(ctx context.Context, settings binlogplayer.VRSettings
return err
}

ctx, cancel := context.WithTimeout(ctx, copyPhaseDuration)
ctx, cancel := context.WithTimeout(ctx, vttablet.CopyPhaseDuration)
defer cancel()

rowsCopiedTicker := time.NewTicker(rowsCopiedUpdateInterval)
Expand Down
38 changes: 19 additions & 19 deletions go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,10 @@ func testPlayerCopyCharPK(t *testing.T) {
reset := vstreamer.AdjustPacketSize(1)
defer reset()

savedCopyPhaseDuration := copyPhaseDuration
savedCopyPhaseDuration := vttablet.CopyPhaseDuration
// copyPhaseDuration should be low enough to have time to send one row.
copyPhaseDuration = 500 * time.Millisecond
defer func() { copyPhaseDuration = savedCopyPhaseDuration }()
vttablet.CopyPhaseDuration = 500 * time.Millisecond
defer func() { vttablet.CopyPhaseDuration = savedCopyPhaseDuration }()

savedWaitRetryTime := waitRetryTime
// waitRetry time should be very low to cause the wait loop to execute multipel times.
Expand Down Expand Up @@ -203,10 +203,10 @@ func testPlayerCopyVarcharPKCaseInsensitive(t *testing.T) {
reset := vstreamer.AdjustPacketSize(1)
defer reset()

savedCopyPhaseDuration := copyPhaseDuration
savedCopyPhaseDuration := vttablet.CopyPhaseDuration
// copyPhaseDuration should be low enough to have time to send one row.
copyPhaseDuration = 500 * time.Millisecond
defer func() { copyPhaseDuration = savedCopyPhaseDuration }()
vttablet.CopyPhaseDuration = 500 * time.Millisecond
defer func() { vttablet.CopyPhaseDuration = savedCopyPhaseDuration }()

savedWaitRetryTime := waitRetryTime
// waitRetry time should be very low to cause the wait loop to execute multiple times.
Expand Down Expand Up @@ -326,10 +326,10 @@ func testPlayerCopyVarcharCompositePKCaseSensitiveCollation(t *testing.T) {
reset := vstreamer.AdjustPacketSize(1)
defer reset()

savedCopyPhaseDuration := copyPhaseDuration
savedCopyPhaseDuration := vttablet.CopyPhaseDuration
// copyPhaseDuration should be low enough to have time to send one row.
copyPhaseDuration = 500 * time.Millisecond
defer func() { copyPhaseDuration = savedCopyPhaseDuration }()
vttablet.CopyPhaseDuration = 500 * time.Millisecond
defer func() { vttablet.CopyPhaseDuration = savedCopyPhaseDuration }()

savedWaitRetryTime := waitRetryTime
// waitRetry time should be very low to cause the wait loop to execute multipel times.
Expand Down Expand Up @@ -674,10 +674,10 @@ func testPlayerCopyBigTable(t *testing.T) {
reset := vstreamer.AdjustPacketSize(1)
defer reset()

savedCopyPhaseDuration := copyPhaseDuration
savedCopyPhaseDuration := vttablet.CopyPhaseDuration
// copyPhaseDuration should be low enough to have time to send one row.
copyPhaseDuration = 500 * time.Millisecond
defer func() { copyPhaseDuration = savedCopyPhaseDuration }()
vttablet.CopyPhaseDuration = 500 * time.Millisecond
defer func() { vttablet.CopyPhaseDuration = savedCopyPhaseDuration }()

savedWaitRetryTime := waitRetryTime
// waitRetry time should be very low to cause the wait loop to execute multiple times.
Expand Down Expand Up @@ -805,10 +805,10 @@ func testPlayerCopyWildcardRule(t *testing.T) {
reset := vstreamer.AdjustPacketSize(1)
defer reset()

savedCopyPhaseDuration := copyPhaseDuration
savedCopyPhaseDuration := vttablet.CopyPhaseDuration
// copyPhaseDuration should be low enough to have time to send one row.
copyPhaseDuration = 500 * time.Millisecond
defer func() { copyPhaseDuration = savedCopyPhaseDuration }()
vttablet.CopyPhaseDuration = 500 * time.Millisecond
defer func() { vttablet.CopyPhaseDuration = savedCopyPhaseDuration }()

savedWaitRetryTime := waitRetryTime
// waitRetry time should be very low to cause the wait loop to execute multipel times.
Expand Down Expand Up @@ -1522,14 +1522,14 @@ func testPlayerCopyTableCancel(t *testing.T) {
})
env.SchemaEngine.Reload(context.Background())

saveTimeout := copyPhaseDuration
copyPhaseDuration = 1 * time.Millisecond
defer func() { copyPhaseDuration = saveTimeout }()
saveTimeout := vttablet.CopyPhaseDuration
vttablet.CopyPhaseDuration = 1 * time.Millisecond
defer func() { vttablet.CopyPhaseDuration = saveTimeout }()

// Set a hook to reset the copy timeout after first call.
vstreamRowsHook = func(ctx context.Context) {
<-ctx.Done()
copyPhaseDuration = saveTimeout
vttablet.CopyPhaseDuration = saveTimeout
vstreamRowsHook = nil
}

Expand Down
9 changes: 6 additions & 3 deletions go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ import (
"sync"
"time"

"vitess.io/vitess/go/vt/vttablet"

"vitess.io/vitess/go/mysql/collations"
"vitess.io/vitess/go/mysql/replication"
"vitess.io/vitess/go/sqlescape"
Expand All @@ -38,6 +36,7 @@ import (
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vtgate/vindexes"
"vitess.io/vitess/go/vt/vttablet"
"vitess.io/vitess/go/vt/vttablet/tabletserver/schema"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp"
)
Expand Down Expand Up @@ -261,7 +260,7 @@ func (rs *rowStreamer) buildPKColumns(st *binlogdatapb.MinimalTable) ([]int, err
func (rs *rowStreamer) buildSelect(st *binlogdatapb.MinimalTable) (string, error) {
buf := sqlparser.NewTrackedBuffer(nil)
// We could have used select *, but being explicit is more predictable.
buf.Myprintf("select ")
buf.Myprintf("select %s", GetVReplicationMaxExecutionTimeQueryHint())
prefix := ""
for _, col := range rs.plan.Table.Fields {
if rs.plan.isConvertColumnUsingUTF8(col.Name) {
Expand Down Expand Up @@ -470,3 +469,7 @@ func (rs *rowStreamer) streamQuery(send func(*binlogdatapb.VStreamRowsResponse)

return nil
}

func GetVReplicationMaxExecutionTimeQueryHint() string {
return fmt.Sprintf("/*+ MAX_EXECUTION_TIME(%v) */ ", vttablet.CopyPhaseDuration.Milliseconds())
}
Loading

0 comments on commit 0adaf78

Please sign in to comment.