diff --git a/go/flags/endtoend/vtcombo.txt b/go/flags/endtoend/vtcombo.txt index 4ed61f95080..71c11c54088 100644 --- a/go/flags/endtoend/vtcombo.txt +++ b/go/flags/endtoend/vtcombo.txt @@ -400,11 +400,14 @@ Flags: --vreplication_copy_phase_duration duration Duration for each copy phase loop (before running the next catchup: default 1h) (default 1h0m0s) --vreplication_copy_phase_max_innodb_history_list_length int The maximum InnoDB transaction history that can exist on a vstreamer (source) before starting another round of copying rows. This helps to limit the impact on the source tablet. (default 1000000) --vreplication_copy_phase_max_mysql_replication_lag int The maximum MySQL replication lag (in seconds) that can exist on a vstreamer (source) before starting another round of copying rows. This helps to limit the impact on the source tablet. (default 43200) + --vreplication_experimental_flags int (Bitmask) of experimental features in vreplication to enable (default 3) --vreplication_healthcheck_retry_delay duration healthcheck retry delay (default 5s) --vreplication_healthcheck_timeout duration healthcheck retry delay (default 1m0s) --vreplication_healthcheck_topology_refresh duration refresh interval for re-reading the topology (default 30s) --vreplication_heartbeat_update_interval int Frequency (in seconds, default 1, max 60) at which the time_updated column of a vreplication stream when idling (default 1) --vreplication_max_time_to_retry_on_error duration stop automatically retrying when we've had consecutive failures with the same error for this long after the first occurrence + --vreplication_net_read_timeout int Session value of net_read_timeout for vreplication, in seconds (default 300) + --vreplication_net_write_timeout int Session value of net_write_timeout for vreplication, in seconds (default 600) --vreplication_replica_lag_tolerance duration Replica lag threshold duration: once lag is below this we switch from copy phase to the replication (streaming) phase (default 1m0s) --vreplication_retry_delay duration delay before retrying a failed workflow event in the replication phase (default 5s) --vreplication_store_compressed_gtid Store compressed gtids in the pos column of the sidecar database's vreplication table diff --git a/go/vt/vttablet/flags.go b/go/vt/vttablet/flags.go index 87453e2ebcd..3ce2cd3b378 100644 --- a/go/vt/vttablet/flags.go +++ b/go/vt/vttablet/flags.go @@ -17,6 +17,8 @@ limitations under the License. package vttablet import ( + "time" + "github.com/spf13/pflag" "vitess.io/vitess/go/vt/servenv" @@ -31,10 +33,13 @@ var ( VReplicationExperimentalFlags = VReplicationExperimentalFlagOptimizeInserts | VReplicationExperimentalFlagAllowNoBlobBinlogRowImage VReplicationNetReadTimeout = 300 VReplicationNetWriteTimeout = 600 + CopyPhaseDuration = 1 * time.Hour ) func init() { servenv.OnParseFor("vttablet", registerFlags) + servenv.OnParseFor("vtcombo", registerFlags) + } func registerFlags(fs *pflag.FlagSet) { @@ -42,4 +47,5 @@ func registerFlags(fs *pflag.FlagSet) { "(Bitmask) of experimental features in vreplication to enable") fs.IntVar(&VReplicationNetReadTimeout, "vreplication_net_read_timeout", VReplicationNetReadTimeout, "Session value of net_read_timeout for vreplication, in seconds") fs.IntVar(&VReplicationNetWriteTimeout, "vreplication_net_write_timeout", VReplicationNetWriteTimeout, "Session value of net_write_timeout for vreplication, in seconds") + fs.DurationVar(&CopyPhaseDuration, "vreplication_copy_phase_duration", CopyPhaseDuration, "Duration for each copy phase loop (before running the next catchup: default 1h)") } diff --git a/go/vt/vttablet/tabletmanager/vreplication/flags.go b/go/vt/vttablet/tabletmanager/vreplication/flags.go index 7456c51e524..44f07f87a0f 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/flags.go +++ b/go/vt/vttablet/tabletmanager/vreplication/flags.go @@ -33,7 +33,6 @@ var ( relayLogMaxSize = 250000 relayLogMaxItems = 5000 - copyPhaseDuration = 1 * time.Hour replicaLagTolerance = 1 * time.Minute vreplicationHeartbeatUpdateInterval = 1 @@ -53,7 +52,6 @@ func registerVReplicationFlags(fs *pflag.FlagSet) { fs.IntVar(&relayLogMaxSize, "relay_log_max_size", relayLogMaxSize, "Maximum buffer size (in bytes) for VReplication target buffering. If single rows are larger than this, a single row is buffered at a time.") fs.IntVar(&relayLogMaxItems, "relay_log_max_items", relayLogMaxItems, "Maximum number of rows for VReplication target buffering.") - fs.DurationVar(©PhaseDuration, "vreplication_copy_phase_duration", copyPhaseDuration, "Duration for each copy phase loop (before running the next catchup: default 1h)") fs.DurationVar(&replicaLagTolerance, "vreplication_replica_lag_tolerance", replicaLagTolerance, "Replica lag threshold duration: once lag is below this we switch from copy phase to the replication (streaming) phase") // vreplicationHeartbeatUpdateInterval determines how often the time_updated column is updated if there are no real events on the source and the source diff --git a/go/vt/vttablet/tabletmanager/vreplication/table_plan_partial.go b/go/vt/vttablet/tabletmanager/vreplication/table_plan_partial.go index be1242c9288..c6ccb898996 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/table_plan_partial.go +++ b/go/vt/vttablet/tabletmanager/vreplication/table_plan_partial.go @@ -19,14 +19,14 @@ package vreplication import ( "fmt" - "vitess.io/vitess/go/vt/vttablet" - - "vitess.io/vitess/go/vt/log" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" querypb "vitess.io/vitess/go/vt/proto/query" vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" + + "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/vttablet" ) // isBitSet returns true if the bit at index is set diff --git a/go/vt/vttablet/tabletmanager/vreplication/vcopier.go b/go/vt/vttablet/tabletmanager/vreplication/vcopier.go index ebfe0e22343..2df808c3a77 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vcopier.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vcopier.go @@ -27,6 +27,8 @@ import ( "google.golang.org/protobuf/encoding/prototext" + "vitess.io/vitess/go/vt/vttablet" + "vitess.io/vitess/go/bytes2" "vitess.io/vitess/go/mysql/replication" "vitess.io/vitess/go/pools" @@ -393,7 +395,7 @@ func (vc *vcopier) copyTable(ctx context.Context, tableName string, copyState ma return fmt.Errorf("plan not found for table: %s, current plans are: %#v", tableName, plan.TargetTables) } - ctx, cancel := context.WithTimeout(ctx, copyPhaseDuration) + ctx, cancel := context.WithTimeout(ctx, vttablet.CopyPhaseDuration) defer cancel() var lastpkpb *querypb.QueryResult diff --git a/go/vt/vttablet/tabletmanager/vreplication/vcopier_atomic.go b/go/vt/vttablet/tabletmanager/vreplication/vcopier_atomic.go index 6252690a629..4da072e3955 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vcopier_atomic.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vcopier_atomic.go @@ -23,6 +23,8 @@ import ( "strconv" "time" + "vitess.io/vitess/go/vt/vttablet" + "google.golang.org/protobuf/encoding/prototext" "vitess.io/vitess/go/sqltypes" @@ -77,7 +79,7 @@ func (vc *vcopier) copyAll(ctx context.Context, settings binlogplayer.VRSettings return err } - ctx, cancel := context.WithTimeout(ctx, copyPhaseDuration) + ctx, cancel := context.WithTimeout(ctx, vttablet.CopyPhaseDuration) defer cancel() rowsCopiedTicker := time.NewTicker(rowsCopiedUpdateInterval) diff --git a/go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go b/go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go index ff9b9daf00f..82a6d211b4f 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go @@ -96,10 +96,10 @@ func testPlayerCopyCharPK(t *testing.T) { reset := vstreamer.AdjustPacketSize(1) defer reset() - savedCopyPhaseDuration := copyPhaseDuration + savedCopyPhaseDuration := vttablet.CopyPhaseDuration // copyPhaseDuration should be low enough to have time to send one row. - copyPhaseDuration = 500 * time.Millisecond - defer func() { copyPhaseDuration = savedCopyPhaseDuration }() + vttablet.CopyPhaseDuration = 500 * time.Millisecond + defer func() { vttablet.CopyPhaseDuration = savedCopyPhaseDuration }() savedWaitRetryTime := waitRetryTime // waitRetry time should be very low to cause the wait loop to execute multipel times. @@ -203,10 +203,10 @@ func testPlayerCopyVarcharPKCaseInsensitive(t *testing.T) { reset := vstreamer.AdjustPacketSize(1) defer reset() - savedCopyPhaseDuration := copyPhaseDuration + savedCopyPhaseDuration := vttablet.CopyPhaseDuration // copyPhaseDuration should be low enough to have time to send one row. - copyPhaseDuration = 500 * time.Millisecond - defer func() { copyPhaseDuration = savedCopyPhaseDuration }() + vttablet.CopyPhaseDuration = 500 * time.Millisecond + defer func() { vttablet.CopyPhaseDuration = savedCopyPhaseDuration }() savedWaitRetryTime := waitRetryTime // waitRetry time should be very low to cause the wait loop to execute multiple times. @@ -326,10 +326,10 @@ func testPlayerCopyVarcharCompositePKCaseSensitiveCollation(t *testing.T) { reset := vstreamer.AdjustPacketSize(1) defer reset() - savedCopyPhaseDuration := copyPhaseDuration + savedCopyPhaseDuration := vttablet.CopyPhaseDuration // copyPhaseDuration should be low enough to have time to send one row. - copyPhaseDuration = 500 * time.Millisecond - defer func() { copyPhaseDuration = savedCopyPhaseDuration }() + vttablet.CopyPhaseDuration = 500 * time.Millisecond + defer func() { vttablet.CopyPhaseDuration = savedCopyPhaseDuration }() savedWaitRetryTime := waitRetryTime // waitRetry time should be very low to cause the wait loop to execute multipel times. @@ -674,10 +674,10 @@ func testPlayerCopyBigTable(t *testing.T) { reset := vstreamer.AdjustPacketSize(1) defer reset() - savedCopyPhaseDuration := copyPhaseDuration + savedCopyPhaseDuration := vttablet.CopyPhaseDuration // copyPhaseDuration should be low enough to have time to send one row. - copyPhaseDuration = 500 * time.Millisecond - defer func() { copyPhaseDuration = savedCopyPhaseDuration }() + vttablet.CopyPhaseDuration = 500 * time.Millisecond + defer func() { vttablet.CopyPhaseDuration = savedCopyPhaseDuration }() savedWaitRetryTime := waitRetryTime // waitRetry time should be very low to cause the wait loop to execute multiple times. @@ -805,10 +805,10 @@ func testPlayerCopyWildcardRule(t *testing.T) { reset := vstreamer.AdjustPacketSize(1) defer reset() - savedCopyPhaseDuration := copyPhaseDuration + savedCopyPhaseDuration := vttablet.CopyPhaseDuration // copyPhaseDuration should be low enough to have time to send one row. - copyPhaseDuration = 500 * time.Millisecond - defer func() { copyPhaseDuration = savedCopyPhaseDuration }() + vttablet.CopyPhaseDuration = 500 * time.Millisecond + defer func() { vttablet.CopyPhaseDuration = savedCopyPhaseDuration }() savedWaitRetryTime := waitRetryTime // waitRetry time should be very low to cause the wait loop to execute multipel times. @@ -1522,14 +1522,14 @@ func testPlayerCopyTableCancel(t *testing.T) { }) env.SchemaEngine.Reload(context.Background()) - saveTimeout := copyPhaseDuration - copyPhaseDuration = 1 * time.Millisecond - defer func() { copyPhaseDuration = saveTimeout }() + saveTimeout := vttablet.CopyPhaseDuration + vttablet.CopyPhaseDuration = 1 * time.Millisecond + defer func() { vttablet.CopyPhaseDuration = saveTimeout }() // Set a hook to reset the copy timeout after first call. vstreamRowsHook = func(ctx context.Context) { <-ctx.Done() - copyPhaseDuration = saveTimeout + vttablet.CopyPhaseDuration = saveTimeout vstreamRowsHook = nil } diff --git a/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go index f595f680d4d..bd259864981 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go @@ -22,8 +22,6 @@ import ( "sync" "time" - "vitess.io/vitess/go/vt/vttablet" - "vitess.io/vitess/go/mysql/collations" "vitess.io/vitess/go/mysql/replication" "vitess.io/vitess/go/sqlescape" @@ -38,6 +36,7 @@ import ( "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vtgate/vindexes" + "vitess.io/vitess/go/vt/vttablet" "vitess.io/vitess/go/vt/vttablet/tabletserver/schema" "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" ) @@ -261,7 +260,7 @@ func (rs *rowStreamer) buildPKColumns(st *binlogdatapb.MinimalTable) ([]int, err func (rs *rowStreamer) buildSelect(st *binlogdatapb.MinimalTable) (string, error) { buf := sqlparser.NewTrackedBuffer(nil) // We could have used select *, but being explicit is more predictable. - buf.Myprintf("select ") + buf.Myprintf("select %s", GetVReplicationMaxExecutionTimeQueryHint()) prefix := "" for _, col := range rs.plan.Table.Fields { if rs.plan.isConvertColumnUsingUTF8(col.Name) { @@ -470,3 +469,7 @@ func (rs *rowStreamer) streamQuery(send func(*binlogdatapb.VStreamRowsResponse) return nil } + +func GetVReplicationMaxExecutionTimeQueryHint() string { + return fmt.Sprintf("/*+ MAX_EXECUTION_TIME(%v) */ ", vttablet.CopyPhaseDuration.Milliseconds()) +} diff --git a/go/vt/vttablet/tabletserver/vstreamer/rowstreamer_test.go b/go/vt/vttablet/tabletserver/vstreamer/rowstreamer_test.go index fcc179dbc5b..9828481397b 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/rowstreamer_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/rowstreamer_test.go @@ -71,7 +71,7 @@ func TestStreamRowsScan(t *testing.T) { `fields:{name:"1" type:INT64 charset:63} pkfields:{name:"id" type:INT32 charset:63}`, `rows:{lengths:1 values:"1"} rows:{lengths:1 values:"1"} lastpk:{lengths:1 values:"2"}`, } - wantQuery := "select id, val from t1 force index (`PRIMARY`) order by id" + wantQuery := "select /*+ MAX_EXECUTION_TIME(3600000) */ id, val from t1 force index (`PRIMARY`) order by id" checkStream(t, "select 1 from t1", nil, wantQuery, wantStream) // t1: simulates rollup, with non-pk column @@ -79,7 +79,7 @@ func TestStreamRowsScan(t *testing.T) { `fields:{name:"1" type:INT64 charset:63} fields:{name:"val" type:VARBINARY table:"t1" org_table:"t1" database:"vttest" org_name:"val" column_length:128 charset:63 column_type:"varbinary(128)"} pkfields:{name:"id" type:INT32 charset:63}`, `rows:{lengths:1 lengths:3 values:"1aaa"} rows:{lengths:1 lengths:3 values:"1bbb"} lastpk:{lengths:1 values:"2"}`, } - wantQuery = "select id, val from t1 force index (`PRIMARY`) order by id" + wantQuery = "select /*+ MAX_EXECUTION_TIME(3600000) */ id, val from t1 force index (`PRIMARY`) order by id" checkStream(t, "select 1, val from t1", nil, wantQuery, wantStream) // t1: simulates rollup, with pk and non-pk column @@ -87,7 +87,7 @@ func TestStreamRowsScan(t *testing.T) { `fields:{name:"1" type:INT64 charset:63} fields:{name:"id" type:INT32 table:"t1" org_table:"t1" database:"vttest" org_name:"id" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"val" type:VARBINARY table:"t1" org_table:"t1" database:"vttest" org_name:"val" column_length:128 charset:63 column_type:"varbinary(128)"} pkfields:{name:"id" type:INT32 charset:63}`, `rows:{lengths:1 lengths:1 lengths:3 values:"11aaa"} rows:{lengths:1 lengths:1 lengths:3 values:"12bbb"} lastpk:{lengths:1 values:"2"}`, } - wantQuery = "select id, val from t1 force index (`PRIMARY`) order by id" + wantQuery = "select /*+ MAX_EXECUTION_TIME(3600000) */ id, val from t1 force index (`PRIMARY`) order by id" checkStream(t, "select 1, id, val from t1", nil, wantQuery, wantStream) // t1: no pk in select list @@ -95,7 +95,7 @@ func TestStreamRowsScan(t *testing.T) { `fields:{name:"val" type:VARBINARY table:"t1" org_table:"t1" database:"vttest" org_name:"val" column_length:128 charset:63 column_type:"varbinary(128)"} pkfields:{name:"id" type:INT32 charset:63}`, `rows:{lengths:3 values:"aaa"} rows:{lengths:3 values:"bbb"} lastpk:{lengths:1 values:"2"}`, } - wantQuery = "select id, val from t1 force index (`PRIMARY`) order by id" + wantQuery = "select /*+ MAX_EXECUTION_TIME(3600000) */ id, val from t1 force index (`PRIMARY`) order by id" checkStream(t, "select val from t1", nil, wantQuery, wantStream) // t1: all rows @@ -103,7 +103,7 @@ func TestStreamRowsScan(t *testing.T) { `fields:{name:"id" type:INT32 table:"t1" org_table:"t1" database:"vttest" org_name:"id" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"val" type:VARBINARY table:"t1" org_table:"t1" database:"vttest" org_name:"val" column_length:128 charset:63 column_type:"varbinary(128)"} pkfields:{name:"id" type:INT32 charset:63}`, `rows:{lengths:1 lengths:3 values:"1aaa"} rows:{lengths:1 lengths:3 values:"2bbb"} lastpk:{lengths:1 values:"2"}`, } - wantQuery = "select id, val from t1 force index (`PRIMARY`) order by id" + wantQuery = "select /*+ MAX_EXECUTION_TIME(3600000) */ id, val from t1 force index (`PRIMARY`) order by id" checkStream(t, "select * from t1", nil, wantQuery, wantStream) // t1: lastpk=1 @@ -111,7 +111,7 @@ func TestStreamRowsScan(t *testing.T) { `fields:{name:"id" type:INT32 table:"t1" org_table:"t1" database:"vttest" org_name:"id" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"val" type:VARBINARY table:"t1" org_table:"t1" database:"vttest" org_name:"val" column_length:128 charset:63 column_type:"varbinary(128)"} pkfields:{name:"id" type:INT32 charset:63}`, `rows:{lengths:1 lengths:3 values:"2bbb"} lastpk:{lengths:1 values:"2"}`, } - wantQuery = "select id, val from t1 force index (`PRIMARY`) where (id > 1) order by id" + wantQuery = "select /*+ MAX_EXECUTION_TIME(3600000) */ id, val from t1 force index (`PRIMARY`) where (id > 1) order by id" checkStream(t, "select * from t1", []sqltypes.Value{sqltypes.NewInt64(1)}, wantQuery, wantStream) // t1: different column ordering @@ -119,7 +119,7 @@ func TestStreamRowsScan(t *testing.T) { `fields:{name:"val" type:VARBINARY table:"t1" org_table:"t1" database:"vttest" org_name:"val" column_length:128 charset:63 column_type:"varbinary(128)"} fields:{name:"id" type:INT32 table:"t1" org_table:"t1" database:"vttest" org_name:"id" column_length:11 charset:63 column_type:"int(11)"} pkfields:{name:"id" type:INT32 charset:63}`, `rows:{lengths:3 lengths:1 values:"aaa1"} rows:{lengths:3 lengths:1 values:"bbb2"} lastpk:{lengths:1 values:"2"}`, } - wantQuery = "select id, val from t1 force index (`PRIMARY`) order by id" + wantQuery = "select /*+ MAX_EXECUTION_TIME(3600000) */ id, val from t1 force index (`PRIMARY`) order by id" checkStream(t, "select val, id from t1", nil, wantQuery, wantStream) // t2: all rows @@ -127,7 +127,7 @@ func TestStreamRowsScan(t *testing.T) { `fields:{name:"id1" type:INT32 table:"t2" org_table:"t2" database:"vttest" org_name:"id1" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"id2" type:INT32 table:"t2" org_table:"t2" database:"vttest" org_name:"id2" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"val" type:VARBINARY table:"t2" org_table:"t2" database:"vttest" org_name:"val" column_length:128 charset:63 column_type:"varbinary(128)"} pkfields:{name:"id1" type:INT32 charset:63} pkfields:{name:"id2" type:INT32 charset:63}`, `rows:{lengths:1 lengths:1 lengths:3 values:"12aaa"} rows:{lengths:1 lengths:1 lengths:3 values:"13bbb"} lastpk:{lengths:1 lengths:1 values:"13"}`, } - wantQuery = "select id1, id2, val from t2 force index (`PRIMARY`) order by id1, id2" + wantQuery = "select /*+ MAX_EXECUTION_TIME(3600000) */ id1, id2, val from t2 force index (`PRIMARY`) order by id1, id2" checkStream(t, "select * from t2", nil, wantQuery, wantStream) // t2: lastpk=1,2 @@ -135,7 +135,7 @@ func TestStreamRowsScan(t *testing.T) { `fields:{name:"id1" type:INT32 table:"t2" org_table:"t2" database:"vttest" org_name:"id1" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"id2" type:INT32 table:"t2" org_table:"t2" database:"vttest" org_name:"id2" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"val" type:VARBINARY table:"t2" org_table:"t2" database:"vttest" org_name:"val" column_length:128 charset:63 column_type:"varbinary(128)"} pkfields:{name:"id1" type:INT32 charset:63} pkfields:{name:"id2" type:INT32 charset:63}`, `rows:{lengths:1 lengths:1 lengths:3 values:"13bbb"} lastpk:{lengths:1 lengths:1 values:"13"}`, } - wantQuery = "select id1, id2, val from t2 force index (`PRIMARY`) where (id1 = 1 and id2 > 2) or (id1 > 1) order by id1, id2" + wantQuery = "select /*+ MAX_EXECUTION_TIME(3600000) */ id1, id2, val from t2 force index (`PRIMARY`) where (id1 = 1 and id2 > 2) or (id1 > 1) order by id1, id2" checkStream(t, "select * from t2", []sqltypes.Value{sqltypes.NewInt64(1), sqltypes.NewInt64(2)}, wantQuery, wantStream) // t3: all rows @@ -143,7 +143,7 @@ func TestStreamRowsScan(t *testing.T) { `fields:{name:"id" type:INT32 table:"t3" org_table:"t3" database:"vttest" org_name:"id" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"val" type:VARBINARY table:"t3" org_table:"t3" database:"vttest" org_name:"val" column_length:128 charset:63 column_type:"varbinary(128)"} pkfields:{name:"id" type:INT32 charset:63} pkfields:{name:"val" type:VARBINARY charset:63}`, `rows:{lengths:1 lengths:3 values:"1aaa"} rows:{lengths:1 lengths:3 values:"2bbb"} lastpk:{lengths:1 lengths:3 values:"2bbb"}`, } - wantQuery = "select id, val from t3 order by id, val" + wantQuery = "select /*+ MAX_EXECUTION_TIME(3600000) */ id, val from t3 order by id, val" checkStream(t, "select * from t3", nil, wantQuery, wantStream) // t3: lastpk: 1,'aaa' @@ -151,7 +151,7 @@ func TestStreamRowsScan(t *testing.T) { `fields:{name:"id" type:INT32 table:"t3" org_table:"t3" database:"vttest" org_name:"id" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"val" type:VARBINARY table:"t3" org_table:"t3" database:"vttest" org_name:"val" column_length:128 charset:63 column_type:"varbinary(128)"} pkfields:{name:"id" type:INT32 charset:63} pkfields:{name:"val" type:VARBINARY charset:63}`, `rows:{lengths:1 lengths:3 values:"2bbb"} lastpk:{lengths:1 lengths:3 values:"2bbb"}`, } - wantQuery = "select id, val from t3 where (id = 1 and val > 'aaa') or (id > 1) order by id, val" + wantQuery = "select /*+ MAX_EXECUTION_TIME(3600000) */ id, val from t3 where (id = 1 and val > 'aaa') or (id > 1) order by id, val" checkStream(t, "select * from t3", []sqltypes.Value{sqltypes.NewInt64(1), sqltypes.NewVarBinary("aaa")}, wantQuery, wantStream) // t4: all rows @@ -159,7 +159,7 @@ func TestStreamRowsScan(t *testing.T) { `fields:{name:"id1" type:INT32 table:"t4" org_table:"t4" database:"vttest" org_name:"id1" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"id2" type:INT32 table:"t4" org_table:"t4" database:"vttest" org_name:"id2" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"id3" type:INT32 table:"t4" org_table:"t4" database:"vttest" org_name:"id3" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"val" type:VARBINARY table:"t4" org_table:"t4" database:"vttest" org_name:"val" column_length:128 charset:63 column_type:"varbinary(128)"} pkfields:{name:"id1" type:INT32 charset:63} pkfields:{name:"id2" type:INT32 charset:63} pkfields:{name:"id3" type:INT32 charset:63}`, `rows:{lengths:1 lengths:1 lengths:1 lengths:3 values:"123aaa"} rows:{lengths:1 lengths:1 lengths:1 lengths:3 values:"234bbb"} lastpk:{lengths:1 lengths:1 lengths:1 values:"234"}`, } - wantQuery = "select id1, id2, id3, val from t4 force index (`PRIMARY`) order by id1, id2, id3" + wantQuery = "select /*+ MAX_EXECUTION_TIME(3600000) */ id1, id2, id3, val from t4 force index (`PRIMARY`) order by id1, id2, id3" checkStream(t, "select * from t4", nil, wantQuery, wantStream) // t4: lastpk: 1,2,3 @@ -167,7 +167,7 @@ func TestStreamRowsScan(t *testing.T) { `fields:{name:"id1" type:INT32 table:"t4" org_table:"t4" database:"vttest" org_name:"id1" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"id2" type:INT32 table:"t4" org_table:"t4" database:"vttest" org_name:"id2" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"id3" type:INT32 table:"t4" org_table:"t4" database:"vttest" org_name:"id3" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"val" type:VARBINARY table:"t4" org_table:"t4" database:"vttest" org_name:"val" column_length:128 charset:63 column_type:"varbinary(128)"} pkfields:{name:"id1" type:INT32 charset:63} pkfields:{name:"id2" type:INT32 charset:63} pkfields:{name:"id3" type:INT32 charset:63}`, `rows:{lengths:1 lengths:1 lengths:1 lengths:3 values:"234bbb"} lastpk:{lengths:1 lengths:1 lengths:1 values:"234"}`, } - wantQuery = "select id1, id2, id3, val from t4 force index (`PRIMARY`) where (id1 = 1 and id2 = 2 and id3 > 3) or (id1 = 1 and id2 > 2) or (id1 > 1) order by id1, id2, id3" + wantQuery = "select /*+ MAX_EXECUTION_TIME(3600000) */ id1, id2, id3, val from t4 force index (`PRIMARY`) where (id1 = 1 and id2 = 2 and id3 > 3) or (id1 = 1 and id2 > 2) or (id1 > 1) order by id1, id2, id3" checkStream(t, "select * from t4", []sqltypes.Value{sqltypes.NewInt64(1), sqltypes.NewInt64(2), sqltypes.NewInt64(3)}, wantQuery, wantStream) // t5: No PK, but a PKE @@ -175,7 +175,7 @@ func TestStreamRowsScan(t *testing.T) { `fields:{name:"id1" type:INT32 table:"t5" org_table:"t5" database:"vttest" org_name:"id1" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"id2" type:INT32 table:"t5" org_table:"t5" database:"vttest" org_name:"id2" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"id3" type:INT32 table:"t5" org_table:"t5" database:"vttest" org_name:"id3" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"val" type:VARBINARY table:"t5" org_table:"t5" database:"vttest" org_name:"val" column_length:128 charset:63 column_type:"varbinary(128)"} pkfields:{name:"id1" type:INT32 charset:63} pkfields:{name:"id2" type:INT32 charset:63} pkfields:{name:"id3" type:INT32 charset:63}`, `rows:{lengths:1 lengths:1 lengths:1 lengths:3 values:"234bbb"} lastpk:{lengths:1 lengths:1 lengths:1 values:"234"}`, } - wantQuery = "select id1, id2, id3, val from t5 force index (`id1_id2_id3`) where (id1 = 1 and id2 = 2 and id3 > 3) or (id1 = 1 and id2 > 2) or (id1 > 1) order by id1, id2, id3" + wantQuery = "select /*+ MAX_EXECUTION_TIME(3600000) */ id1, id2, id3, val from t5 force index (`id1_id2_id3`) where (id1 = 1 and id2 = 2 and id3 > 3) or (id1 = 1 and id2 > 2) or (id1 > 1) order by id1, id2, id3" checkStream(t, "select * from t5", []sqltypes.Value{sqltypes.NewInt64(1), sqltypes.NewInt64(2), sqltypes.NewInt64(3)}, wantQuery, wantStream) // t1: test for unsupported integer literal @@ -270,7 +270,7 @@ func TestStreamRowsKeyRange(t *testing.T) { `fields:{name:"id1" type:INT32 table:"t1" org_table:"t1" database:"vttest" org_name:"id1" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"val" type:VARBINARY table:"t1" org_table:"t1" database:"vttest" org_name:"val" column_length:128 charset:63 column_type:"varbinary(128)"} pkfields:{name:"id1" type:INT32 charset:63}`, `rows:{lengths:1 lengths:3 values:"1aaa"} lastpk:{lengths:1 values:"6"}`, } - wantQuery := "select id1, val from t1 force index (`PRIMARY`) order by id1" + wantQuery := "select /*+ MAX_EXECUTION_TIME(3600000) */ id1, val from t1 force index (`PRIMARY`) order by id1" checkStream(t, "select * from t1 where in_keyrange('-80')", nil, wantQuery, wantStream) } @@ -302,7 +302,7 @@ func TestStreamRowsFilterInt(t *testing.T) { `fields:{name:"id1" type:INT32 table:"t1" org_table:"t1" database:"vttest" org_name:"id1" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"val" type:VARBINARY table:"t1" org_table:"t1" database:"vttest" org_name:"val" column_length:128 charset:63 column_type:"varbinary(128)"} pkfields:{name:"id1" type:INT32 charset:63}`, `rows:{lengths:1 lengths:3 values:"1aaa"} rows:{lengths:1 lengths:3 values:"4ddd"} lastpk:{lengths:1 values:"5"}`, } - wantQuery := "select id1, id2, val from t1 force index (`PRIMARY`) order by id1" + wantQuery := "select /*+ MAX_EXECUTION_TIME(3600000) */ id1, id2, val from t1 force index (`PRIMARY`) order by id1" checkStream(t, "select id1, val from t1 where id2 = 100", nil, wantQuery, wantStream) require.Equal(t, int64(0), engine.rowStreamerNumPackets.Get()) require.Equal(t, int64(2), engine.rowStreamerNumRows.Get()) @@ -335,7 +335,7 @@ func TestStreamRowsFilterVarBinary(t *testing.T) { `fields:{name:"id1" type:INT32 table:"t1" org_table:"t1" database:"vttest" org_name:"id1" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"val" type:VARBINARY table:"t1" org_table:"t1" database:"vttest" org_name:"val" column_length:128 charset:63 column_type:"varbinary(128)"} pkfields:{name:"id1" type:INT32 charset:63}`, `rows:{lengths:1 lengths:6 values:"2newton"} rows:{lengths:1 lengths:6 values:"3newton"} rows:{lengths:1 lengths:6 values:"5newton"} lastpk:{lengths:1 values:"6"}`, } - wantQuery := "select id1, val from t1 force index (`PRIMARY`) order by id1" + wantQuery := "select /*+ MAX_EXECUTION_TIME(3600000) */ id1, val from t1 force index (`PRIMARY`) order by id1" checkStream(t, "select id1, val from t1 where val = 'newton'", nil, wantQuery, wantStream) } @@ -363,7 +363,7 @@ func TestStreamRowsMultiPacket(t *testing.T) { `rows:{lengths:1 lengths:10 values:"42345678901"} lastpk:{lengths:1 values:"4"}`, `rows:{lengths:1 lengths:1 values:"52"} lastpk:{lengths:1 values:"5"}`, } - wantQuery := "select id, val from t1 force index (`PRIMARY`) order by id" + wantQuery := "select /*+ MAX_EXECUTION_TIME(3600000) */ id, val from t1 force index (`PRIMARY`) order by id" checkStream(t, "select * from t1", nil, wantQuery, wantStream) }