Skip to content

Commit

Permalink
fix vreplication_test
Browse files Browse the repository at this point in the history
  • Loading branch information
shivnagarajan committed May 10, 2024
1 parent 5cf9f24 commit c3f3417
Showing 1 changed file with 30 additions and 24 deletions.
54 changes: 30 additions & 24 deletions go/vt/vttablet/tabletmanager/rpc_vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,30 +46,32 @@ import (
)

const (
insertVReplicationPrefix = "insert into _vt.vreplication (workflow, source, pos, max_tps, max_replication_lag, cell, tablet_types, time_updated, transaction_timestamp, state, db_name, workflow_type, workflow_sub_type, defer_secondary_keys)"
getWorkflow = "select id from _vt.vreplication where db_name='vt_%s' and workflow='%s'"
checkForWorkflow = "select 1 from _vt.vreplication where db_name='vt_%s' and workflow='%s'"
checkForFrozenWorkflow = "select 1 from _vt.vreplication where db_name='vt_%s' and message='FROZEN' and workflow_sub_type != 1"
freezeWorkflow = "update _vt.vreplication set message = 'FROZEN' where db_name='vt_%s' and workflow='%s'"
checkForJournal = "/select val from _vt.resharding_journal where id="
getWorkflowStatus = "select id, workflow, source, pos, stop_pos, max_replication_lag, state, db_name, time_updated, transaction_timestamp, message, tags, workflow_type, workflow_sub_type, time_heartbeat, defer_secondary_keys, component_throttled, time_throttled, rows_copied from _vt.vreplication where workflow = '%s' and db_name = 'vt_%s'"
getWorkflowState = "select pos, stop_pos, max_tps, max_replication_lag, state, workflow_type, workflow, workflow_sub_type, defer_secondary_keys from _vt.vreplication where id=1"
getCopyState = "select distinct table_name from _vt.copy_state cs, _vt.vreplication vr where vr.id = cs.vrepl_id and vr.id = 1"
getNumCopyStateTable = "select count(distinct table_name) from _vt.copy_state where vrepl_id=1"
getLatestCopyState = "select table_name, lastpk from _vt.copy_state where vrepl_id = 1 and id in (select max(id) from _vt.copy_state where vrepl_id = 1 group by vrepl_id, table_name)"
getAutoIncrementStep = "select @@session.auto_increment_increment"
setSessionTZ = "set @@session.time_zone = '+00:00'"
setNames = "set names 'binary'"
getBinlogRowImage = "select @@binlog_row_image"
insertStreamsCreatedLog = "insert into _vt.vreplication_log(vrepl_id, type, state, message) values(1, 'Stream Created', '', '%s'"
getVReplicationRecord = "select * from _vt.vreplication where id = 1"
startWorkflow = "update _vt.vreplication set state='Running' where db_name='vt_%s' and workflow='%s'"
stopForCutover = "update _vt.vreplication set state='Stopped', message='stopped for cutover' where id=1"
getMaxValForSequence = "select max(`id`) as maxval from `vt_%s`.`%s`"
initSequenceTable = "insert into %a.%a (id, next_id, cache) values (0, %d, 1000) on duplicate key update next_id = if(next_id < %d, %d, next_id)"
deleteWorkflow = "delete from _vt.vreplication where db_name = 'vt_%s' and workflow = '%s'"
updatePickedSourceTablet = `update _vt.vreplication set message='Picked source tablet: cell:\"%s\" uid:%d' where id=1`
getRowsCopied = "SELECT rows_copied FROM _vt.vreplication WHERE id=1"
insertVReplicationPrefix = "insert into _vt.vreplication (workflow, source, pos, max_tps, max_replication_lag, cell, tablet_types, time_updated, transaction_timestamp, state, db_name, workflow_type, workflow_sub_type, defer_secondary_keys)"
getWorkflow = "select id from _vt.vreplication where db_name='vt_%s' and workflow='%s'"
checkForWorkflow = "select 1 from _vt.vreplication where db_name='vt_%s' and workflow='%s'"
checkForFrozenWorkflow = "select 1 from _vt.vreplication where db_name='vt_%s' and message='FROZEN' and workflow_sub_type != 1"
freezeWorkflow = "update _vt.vreplication set message = 'FROZEN' where db_name='vt_%s' and workflow='%s'"
checkForJournal = "/select val from _vt.resharding_journal where id="
getWorkflowStatus = "select id, workflow, source, pos, stop_pos, max_replication_lag, state, db_name, time_updated, transaction_timestamp, message, tags, workflow_type, workflow_sub_type, time_heartbeat, defer_secondary_keys, component_throttled, time_throttled, rows_copied from _vt.vreplication where workflow = '%s' and db_name = 'vt_%s'"
getWorkflowState = "select pos, stop_pos, max_tps, max_replication_lag, state, workflow_type, workflow, workflow_sub_type, defer_secondary_keys from _vt.vreplication where id=1"
getCopyState = "select distinct table_name from _vt.copy_state cs, _vt.vreplication vr where vr.id = cs.vrepl_id and vr.id = 1"
getNumCopyStateTable = "select count(distinct table_name) from _vt.copy_state where vrepl_id=1"
getLatestCopyState = "select table_name, lastpk from _vt.copy_state where vrepl_id = 1 and id in (select max(id) from _vt.copy_state where vrepl_id = 1 group by vrepl_id, table_name)"
getAutoIncrementStep = "select @@session.auto_increment_increment"
setSessionTZ = "set @@session.time_zone = '+00:00'"
setNames = "set names 'binary'"
setSessionNetReadTimeout = "set @@session.net_read_timeout = 300"
setSessionNetWriteTimeout = "set @@session.net_write_timeout = 600"
getBinlogRowImage = "select @@binlog_row_image"
insertStreamsCreatedLog = "insert into _vt.vreplication_log(vrepl_id, type, state, message) values(1, 'Stream Created', '', '%s'"
getVReplicationRecord = "select * from _vt.vreplication where id = 1"
startWorkflow = "update _vt.vreplication set state='Running' where db_name='vt_%s' and workflow='%s'"
stopForCutover = "update _vt.vreplication set state='Stopped', message='stopped for cutover' where id=1"
getMaxValForSequence = "select max(`id`) as maxval from `vt_%s`.`%s`"
initSequenceTable = "insert into %a.%a (id, next_id, cache) values (0, %d, 1000) on duplicate key update next_id = if(next_id < %d, %d, next_id)"
deleteWorkflow = "delete from _vt.vreplication where db_name = 'vt_%s' and workflow = '%s'"
updatePickedSourceTablet = `update _vt.vreplication set message='Picked source tablet: cell:\"%s\" uid:%d' where id=1`
getRowsCopied = "SELECT rows_copied FROM _vt.vreplication WHERE id=1"
)

var (
Expand Down Expand Up @@ -324,6 +326,8 @@ func TestMoveTables(t *testing.T) {
ftc.vrdbClient.ExpectRequest(fmt.Sprintf(updatePickedSourceTablet, tenv.cells[0], sourceTabletUID), &sqltypes.Result{}, nil)
ftc.vrdbClient.ExpectRequest(setSessionTZ, &sqltypes.Result{}, nil)
ftc.vrdbClient.ExpectRequest(setNames, &sqltypes.Result{}, nil)
ftc.vrdbClient.ExpectRequest(setSessionNetReadTimeout, &sqltypes.Result{}, nil)
ftc.vrdbClient.ExpectRequest(setSessionNetWriteTimeout, &sqltypes.Result{}, nil)
ftc.vrdbClient.ExpectRequest(getRowsCopied,
sqltypes.MakeTestResult(
sqltypes.MakeTestFields(
Expand Down Expand Up @@ -911,6 +915,8 @@ func TestFailedMoveTablesCreateCleanup(t *testing.T) {
&sqltypes.Result{}, nil)
targetTablet.vrdbClient.ExpectRequest(setSessionTZ, &sqltypes.Result{}, nil)
targetTablet.vrdbClient.ExpectRequest(setNames, &sqltypes.Result{}, nil)
targetTablet.vrdbClient.ExpectRequest(setSessionNetReadTimeout, &sqltypes.Result{}, nil)
targetTablet.vrdbClient.ExpectRequest(setSessionNetWriteTimeout, &sqltypes.Result{}, nil)
targetTablet.vrdbClient.ExpectRequest(getRowsCopied,
sqltypes.MakeTestResult(
sqltypes.MakeTestFields(
Expand Down

0 comments on commit c3f3417

Please sign in to comment.