Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[EXPERIMENTAL] VPlayer parallel applier #17370

Draft
wants to merge 43 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
c6f7c2e
VReplication parallel applier; read sequence number and last committe…
shlomi-noach Dec 10, 2024
b8fefdf
last commited and sequence number must not change even if applying mu…
shlomi-noach Dec 11, 2024
b2f0da8
generate CI for onlineddl_vrepl_bench
shlomi-noach Dec 11, 2024
72e0f9f
cluster_endtoend_onlineddl_vrepl_bench.yml
shlomi-noach Dec 11, 2024
847fd52
increase benchmark count and time
shlomi-noach Dec 11, 2024
52c48df
increase wait timeout
shlomi-noach Dec 11, 2024
9812005
wait for a substantial number of results
shlomi-noach Dec 15, 2024
e716c29
Many improvements:
shlomi-noach Dec 15, 2024
19e07b4
Decorate VEvent with MustSave
shlomi-noach Dec 15, 2024
b90f5e3
populate and use MustSave
shlomi-noach Dec 15, 2024
587f929
workers use vplayer's pos. Fix dbClient.Begin() in VEventType_ROW
shlomi-noach Dec 15, 2024
11ed86d
generalize vplayer.updatePos, use by worker
shlomi-noach Dec 15, 2024
478d8d6
addressing synchronization issues
shlomi-noach Dec 16, 2024
c0035ee
VEventType: support PREVIOUS_GTIDS
shlomi-noach Dec 17, 2024
4bfee23
produce and accept binlogdatapb.VEventType_PREVIOUS_GTIDS
shlomi-noach Dec 17, 2024
0fde032
more synchronization adjustements
shlomi-noach Dec 17, 2024
be2acbc
more concurrency fixes. Everything looks to be correct at this time.
shlomi-noach Dec 18, 2024
05438d9
more debug info
shlomi-noach Dec 18, 2024
7587329
remvoe debug info
shlomi-noach Dec 18, 2024
7aa016e
stricter check for pos timestamp
shlomi-noach Dec 18, 2024
99e8e59
support batched mode in parallel mode
shlomi-noach Dec 31, 2024
905b991
a tiny bit of unit testing. Much more to go
shlomi-noach Dec 31, 2024
92e8493
nil check (relevant for tests)
shlomi-noach Dec 31, 2024
d295511
combine batch and parallel
shlomi-noach Dec 31, 2024
c39e5a7
re-adding CI workflows
shlomi-noach Dec 31, 2024
4093857
resolved conflict
shlomi-noach Dec 31, 2024
815a42f
Added missing workflow files
shlomi-noach Dec 31, 2024
a3aa3a0
Fix EOF check
shlomi-noach Jan 1, 2025
2729850
better check for EOF
shlomi-noach Jan 1, 2025
120ab81
check for worker error on availableWorker()
shlomi-noach Jan 1, 2025
e054980
check for posReached before applying any event
shlomi-noach Jan 1, 2025
b295b89
use worker's dbClient in vr.setState; use same context for worker poo…
shlomi-noach Jan 1, 2025
a75b425
setDBClientSettings for worker's dbclient
shlomi-noach Jan 1, 2025
b3e9fdc
ensure storing pos on EOF
shlomi-noach Jan 1, 2025
79d07b3
only setDBClientSettings if filter != nil
shlomi-noach Jan 1, 2025
5f04217
apply SQL mode for workers dbclient
shlomi-noach Jan 1, 2025
088bf7d
close worker connections on drain (do not reset, do not recycle)
shlomi-noach Jan 1, 2025
80142de
remove excessive applyFunc param
shlomi-noach Jan 1, 2025
9fe3a46
no need for drain() to close the conneciton, as it is already clodes …
shlomi-noach Jan 1, 2025
e8679c6
safer checks for EOF
shlomi-noach Jan 1, 2025
fd86928
do not commit is pos is reached
shlomi-noach Jan 1, 2025
61c8f5f
restore original relay log buffering logic: wait for just one event
shlomi-noach Jan 1, 2025
b204ca6
safer checks for EOF
shlomi-noach Jan 1, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
175 changes: 175 additions & 0 deletions .github/workflows/cluster_endtoend_onlineddl_vrepl_bench.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
# DO NOT MODIFY: THIS FILE IS GENERATED USING "make generate_ci_workflows"

name: Cluster (onlineddl_vrepl_bench)
on: [push, pull_request]
concurrency:
group: format('{0}-{1}', ${{ github.ref }}, 'Cluster (onlineddl_vrepl_bench)')
cancel-in-progress: true

permissions: read-all

env:
LAUNCHABLE_ORGANIZATION: "vitess"
LAUNCHABLE_WORKSPACE: "vitess-app"
GITHUB_PR_HEAD_SHA: "${{ github.event.pull_request.head.sha }}"

jobs:
build:
name: Run endtoend tests on Cluster (onlineddl_vrepl_bench)
runs-on: gh-hosted-runners-16cores-1-24.04

steps:
- name: Skip CI
run: |
if [[ "${{contains( github.event.pull_request.labels.*.name, 'Skip CI')}}" == "true" ]]; then
echo "skipping CI due to the 'Skip CI' label"
exit 1
fi

- name: Check if workflow needs to be skipped
id: skip-workflow
run: |
skip='false'
if [[ "${{github.event.pull_request}}" == "" ]] && [[ "${{github.ref}}" != "refs/heads/main" ]] && [[ ! "${{github.ref}}" =~ ^refs/heads/release-[0-9]+\.[0-9]$ ]] && [[ ! "${{github.ref}}" =~ "refs/tags/.*" ]]; then
skip='true'
fi
echo Skip ${skip}
echo "skip-workflow=${skip}" >> $GITHUB_OUTPUT

PR_DATA=$(curl -s\
-H "Authorization: token ${{ secrets.GITHUB_TOKEN }}" \
-H "Accept: application/vnd.github.v3+json" \
"https://api.github.com/repos/${{ github.repository }}/pulls/${{ github.event.pull_request.number }}")
draft=$(echo "$PR_DATA" | jq .draft -r)
echo "is_draft=${draft}" >> $GITHUB_OUTPUT

- name: Check out code
if: steps.skip-workflow.outputs.skip-workflow == 'false'
uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 # v4.1.7

- name: Check for changes in relevant files
if: steps.skip-workflow.outputs.skip-workflow == 'false'
uses: dorny/paths-filter@ebc4d7e9ebcb0b1eb21480bb8f43113e996ac77a # v3.0.1
id: changes
with:
token: ''
filters: |
end_to_end:
- 'go/**/*.go'
- 'go/vt/sidecardb/**/*.sql'
- 'go/test/endtoend/onlineddl/vrepl_suite/**'
- 'test.go'
- 'Makefile'
- 'build.env'
- 'go.sum'
- 'go.mod'
- 'proto/*.proto'
- 'tools/**'
- 'config/**'
- 'bootstrap.sh'
- '.github/workflows/cluster_endtoend_onlineddl_vrepl_bench.yml'
- 'go/test/endtoend/onlineddl/vrepl_suite/testdata'

- name: Set up Go
if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true'
uses: actions/setup-go@0a12ed9d6a96ab950c8f026ed9f722fe0da7ef32 # v5.0.2
with:
go-version-file: go.mod

- name: Set up python
if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true'
uses: actions/setup-python@39cd14951b08e74b54015e9e001cdefcf80e669f # v5.1.1

- name: Tune the OS
if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true'
run: |
# Limit local port range to not use ports that overlap with server side
# ports that we listen on.
sudo sysctl -w net.ipv4.ip_local_port_range="22768 65535"
# Increase the asynchronous non-blocking I/O. More information at https://dev.mysql.com/doc/refman/5.7/en/innodb-parameters.html#sysvar_innodb_use_native_aio
echo "fs.aio-max-nr = 1048576" | sudo tee -a /etc/sysctl.conf
sudo sysctl -p /etc/sysctl.conf

- name: Get dependencies
if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true'
run: |

# Get key to latest MySQL repo
sudo apt-key adv --keyserver keyserver.ubuntu.com --recv-keys A8D3785C
# Setup MySQL 8.0
wget -c https://dev.mysql.com/get/mysql-apt-config_0.8.33-1_all.deb
echo mysql-apt-config mysql-apt-config/select-server select mysql-8.0 | sudo debconf-set-selections
sudo DEBIAN_FRONTEND="noninteractive" dpkg -i mysql-apt-config*
sudo apt-get -qq update

# We have to install this old version of libaio1 in case we end up testing with MySQL 5.7. See also:
# https://bugs.launchpad.net/ubuntu/+source/libaio/+bug/2067501
curl -L -O http://mirrors.kernel.org/ubuntu/pool/main/liba/libaio/libaio1_0.3.112-13build1_amd64.deb
sudo dpkg -i libaio1_0.3.112-13build1_amd64.deb
# libtinfo5 is also needed for older MySQL 5.7 builds.
curl -L -O http://mirrors.kernel.org/ubuntu/pool/universe/n/ncurses/libtinfo5_6.3-2ubuntu0.1_amd64.deb
sudo dpkg -i libtinfo5_6.3-2ubuntu0.1_amd64.deb

# Install everything else we need, and configure
sudo apt-get -qq install -y mysql-server mysql-shell mysql-client make unzip g++ etcd-client etcd-server curl git wget eatmydata xz-utils libncurses6

sudo service mysql stop
sudo service etcd stop
sudo ln -s /etc/apparmor.d/usr.sbin.mysqld /etc/apparmor.d/disable/
sudo apparmor_parser -R /etc/apparmor.d/usr.sbin.mysqld
go mod download

# install JUnit report formatter
go install github.com/vitessio/go-junit-report@HEAD

- name: Setup launchable dependencies
if: steps.skip-workflow.outputs.is_draft == 'false' && steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true' && github.base_ref == 'main'
run: |
# Get Launchable CLI installed. If you can, make it a part of the builder image to speed things up
pip3 install --user launchable~=1.0 > /dev/null

# verify that launchable setup is all correct.
launchable verify || true

# Tell Launchable about the build you are producing and testing
launchable record build --name "$GITHUB_RUN_ID" --no-commit-collection --source .

- name: Run cluster endtoend test
if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true'
timeout-minutes: 45
run: |
# We set the VTDATAROOT to the /tmp folder to reduce the file path of mysql.sock file
# which musn't be more than 107 characters long.
export VTDATAROOT="/tmp/"
source build.env

set -exo pipefail

cat <<-EOF>>./config/mycnf/mysql8026.cnf
binlog-transaction-compression=ON
EOF

cat <<-EOF>>./config/mycnf/mysql8026.cnf
binlog-row-value-options=PARTIAL_JSON
EOF

# run the tests however you normally do, then produce a JUnit XML file
eatmydata -- go run test.go -docker=false -follow -shard onlineddl_vrepl_bench | tee -a output.txt | go-junit-report -set-exit-code > report.xml

- name: Print test output and Record test result in launchable if PR is not a draft
if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true' && always()
run: |
if [[ "${{steps.skip-workflow.outputs.is_draft}}" == "false" ]]; then
# send recorded tests to launchable
launchable record tests --build "$GITHUB_RUN_ID" go-test . || true
fi

# print test output
cat output.txt

- name: Test Summary
if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true' && always()
uses: test-summary/action@31493c76ec9e7aa675f1585d3ed6f1da69269a86 # v2.4
with:
paths: "report.xml"
show: "fail"
2 changes: 1 addition & 1 deletion go/mysql/binlog_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ type BinlogEvent interface {
// GTID returns the GTID from the event, and if this event
// also serves as a BEGIN statement.
// This is only valid if IsGTID() returns true.
GTID(BinlogFormat) (replication.GTID, bool, error)
GTID(BinlogFormat) (gtid replication.GTID, hasBegin bool, lastCommitted int64, sequenceNumber int64, err error)
// Query returns a Query struct representing data from a QUERY_EVENT.
// This is only valid if IsQuery() returns true.
Query(BinlogFormat) (Query, error)
Expand Down
12 changes: 6 additions & 6 deletions go/mysql/binlog_event_filepos.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ func newFilePosBinlogEvent(buf []byte) *filePosBinlogEvent {
return &filePosBinlogEvent{binlogEvent: binlogEvent(buf)}
}

func (*filePosBinlogEvent) GTID(BinlogFormat) (replication.GTID, bool, error) {
return nil, false, nil
func (*filePosBinlogEvent) GTID(BinlogFormat) (replication.GTID, bool, int64, int64, error) {
return nil, false, 0, 0, nil
}

// IsSemiSyncAckRequested implements BinlogEvent.IsSemiSyncAckRequested().
Expand Down Expand Up @@ -223,8 +223,8 @@ func (ev filePosFakeEvent) Format() (BinlogFormat, error) {
return BinlogFormat{}, nil
}

func (ev filePosFakeEvent) GTID(BinlogFormat) (replication.GTID, bool, error) {
return nil, false, nil
func (ev filePosFakeEvent) GTID(BinlogFormat) (replication.GTID, bool, int64, int64, error) {
return nil, false, 0, 0, nil
}

func (ev filePosFakeEvent) Query(BinlogFormat) (Query, error) {
Expand Down Expand Up @@ -303,6 +303,6 @@ func (ev filePosGTIDEvent) StripChecksum(f BinlogFormat) (BinlogEvent, []byte, e
return ev, nil, nil
}

func (ev filePosGTIDEvent) GTID(BinlogFormat) (replication.GTID, bool, error) {
return ev.gtid, false, nil
func (ev filePosGTIDEvent) GTID(BinlogFormat) (replication.GTID, bool, int64, int64, error) {
return ev.gtid, false, 0, 0, nil
}
4 changes: 2 additions & 2 deletions go/mysql/binlog_event_make_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func TestMariadDBGTIDEVent(t *testing.T) {
event, _, err := event.StripChecksum(f)
require.NoError(t, err, "StripChecksum failed: %v", err)

gtid, hasBegin, err := event.GTID(f)
gtid, hasBegin, _, _, err := event.GTID(f)
require.NoError(t, err, "NewMariaDBGTIDEvent().GTID() returned error: %v", err)
require.True(t, hasBegin, "NewMariaDBGTIDEvent() didn't store hasBegin properly.")

Expand All @@ -178,7 +178,7 @@ func TestMariadDBGTIDEVent(t *testing.T) {
event, _, err = event.StripChecksum(f)
require.NoError(t, err, "StripChecksum failed: %v", err)

gtid, hasBegin, err = event.GTID(f)
gtid, hasBegin, _, _, err = event.GTID(f)
require.NoError(t, err, "NewMariaDBGTIDEvent().GTID() returned error: %v", err)
require.False(t, hasBegin, "NewMariaDBGTIDEvent() didn't store hasBegin properly.")

Expand Down
7 changes: 4 additions & 3 deletions go/mysql/binlog_event_mariadb.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,17 +60,18 @@ func (ev mariadbBinlogEvent) IsGTID() bool {
// 8 sequence number
// 4 domain ID
// 1 flags2
func (ev mariadbBinlogEvent) GTID(f BinlogFormat) (replication.GTID, bool, error) {
func (ev mariadbBinlogEvent) GTID(f BinlogFormat) (replication.GTID, bool, int64, int64, error) {
const FLStandalone = 1

data := ev.Bytes()[f.HeaderLength:]
flags2 := data[8+4]

return replication.MariadbGTID{
gtid := replication.MariadbGTID{
Sequence: binary.LittleEndian.Uint64(data[:8]),
Domain: binary.LittleEndian.Uint32(data[8 : 8+4]),
Server: ev.ServerID(),
}, flags2&FLStandalone == 0, nil
}
return gtid, flags2&FLStandalone == 0, 0, 0, nil
}

// PreviousGTIDs implements BinlogEvent.PreviousGTIDs().
Expand Down
8 changes: 4 additions & 4 deletions go/mysql/binlog_event_mariadb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func TestMariadbNotBeginGTID(t *testing.T) {

input := mariadbBinlogEvent{binlogEvent: binlogEvent(mariadbStandaloneGTIDEvent)}
want := false
if _, got, err := input.GTID(f); got != want {
if _, got, _, _, err := input.GTID(f); got != want {
t.Errorf("%#v.GTID() = %v (%v), want %v", input, got, err, want)
}
}
Expand All @@ -88,7 +88,7 @@ func TestMariadbIsBeginGTID(t *testing.T) {

input := mariadbBinlogEvent{binlogEvent: binlogEvent(mariadbBeginGTIDEvent)}
want := true
if _, got, err := input.GTID(f); got != want {
if _, got, _, _, err := input.GTID(f); got != want {
t.Errorf("%#v.IsBeginGTID() = %v (%v), want %v", input, got, err, want)
}
}
Expand All @@ -102,7 +102,7 @@ func TestMariadbStandaloneBinlogEventGTID(t *testing.T) {

input := mariadbBinlogEvent{binlogEvent: binlogEvent(mariadbStandaloneGTIDEvent)}
want := replication.MariadbGTID{Domain: 0, Server: 62344, Sequence: 9}
got, hasBegin, err := input.GTID(f)
got, hasBegin, _, _, err := input.GTID(f)
assert.NoError(t, err, "unexpected error: %v", err)
assert.False(t, hasBegin, "unexpected hasBegin")
assert.True(t, reflect.DeepEqual(got, want), "%#v.GTID() = %#v, want %#v", input, got, want)
Expand All @@ -118,7 +118,7 @@ func TestMariadbBinlogEventGTID(t *testing.T) {

input := mariadbBinlogEvent{binlogEvent: binlogEvent(mariadbBeginGTIDEvent)}
want := replication.MariadbGTID{Domain: 0, Server: 62344, Sequence: 10}
got, hasBegin, err := input.GTID(f)
got, hasBegin, _, _, err := input.GTID(f)
assert.NoError(t, err, "unexpected error: %v", err)
assert.True(t, hasBegin, "unexpected !hasBegin")
assert.True(t, reflect.DeepEqual(got, want), "%#v.GTID() = %#v, want %#v", input, got, want)
Expand Down
23 changes: 19 additions & 4 deletions go/mysql/binlog_event_mysql56.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,27 @@ func (ev mysql56BinlogEvent) IsGTID() bool {
// 1 flags
// 16 SID (server UUID)
// 8 GNO (sequence number, signed int)
func (ev mysql56BinlogEvent) GTID(f BinlogFormat) (replication.GTID, bool, error) {
// 1 lt_type
// 8 last_committed
// 8 sequence_number
func (ev mysql56BinlogEvent) GTID(f BinlogFormat) (gtid replication.GTID, hasBegin bool, lastCommitted int64, sequenceNumber int64, err error) {
data := ev.Bytes()[f.HeaderLength:]
var sid replication.SID
copy(sid[:], data[1:1+16])
gno := int64(binary.LittleEndian.Uint64(data[1+16 : 1+16+8]))
return replication.Mysql56GTID{Server: sid, Sequence: gno}, false /* hasBegin */, nil
pos := 1
copy(sid[:], data[pos:pos+16])
pos += 16 // end of SID
gno := int64(binary.LittleEndian.Uint64(data[pos : pos+8]))
pos += 8 // end of GNO
pos += 1 // end of lt_type
if len(data) >= pos+8 {
lastCommitted = int64(binary.LittleEndian.Uint64(data[pos : pos+8]))
}
pos += 8 // end of last_committed
if len(data) >= pos+8 {
sequenceNumber = int64(binary.LittleEndian.Uint64(data[pos : pos+8]))
}
// pos += 8 // end of sequence_number
return replication.Mysql56GTID{Server: sid, Sequence: gno}, false /* hasBegin */, lastCommitted, sequenceNumber, nil
}

// PreviousGTIDs implements BinlogEvent.PreviousGTIDs().
Expand Down
6 changes: 4 additions & 2 deletions go/mysql/binlog_event_mysql56_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
// Sample event data for MySQL 5.6.
var (
mysql56FormatEvent = NewMysql56BinlogEvent([]byte{0x78, 0x4e, 0x49, 0x55, 0xf, 0x64, 0x0, 0x0, 0x0, 0x74, 0x0, 0x0, 0x0, 0x78, 0x0, 0x0, 0x0, 0x1, 0x0, 0x4, 0x0, 0x35, 0x2e, 0x36, 0x2e, 0x32, 0x34, 0x2d, 0x6c, 0x6f, 0x67, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x78, 0x4e, 0x49, 0x55, 0x13, 0x38, 0xd, 0x0, 0x8, 0x0, 0x12, 0x0, 0x4, 0x4, 0x4, 0x4, 0x12, 0x0, 0x0, 0x5c, 0x0, 0x4, 0x1a, 0x8, 0x0, 0x0, 0x0, 0x8, 0x8, 0x8, 0x2, 0x0, 0x0, 0x0, 0xa, 0xa, 0xa, 0x19, 0x19, 0x0, 0x1, 0x18, 0x4a, 0xf, 0xca})
mysql56GTIDEvent = NewMysql56BinlogEvent([]byte{0xff, 0x4e, 0x49, 0x55, 0x21, 0x64, 0x0, 0x0, 0x0, 0x30, 0x0, 0x0, 0x0, 0xf5, 0x2, 0x0, 0x0, 0x0, 0x0, 0x1, 0x43, 0x91, 0x92, 0xbd, 0xf3, 0x7c, 0x11, 0xe4, 0xbb, 0xeb, 0x2, 0x42, 0xac, 0x11, 0x3, 0x5a, 0x4, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x48, 0x45, 0x82, 0x27})
mysql56GTIDEvent = NewMysql56BinlogEvent([]byte{0xff, 0x4e, 0x49, 0x55, 0x21, 0x64, 0x0, 0x0, 0x0, 0x30, 0x0, 0x0, 0x0, 0xf5, 0x2, 0x0, 0x0, 0x0, 0x0, 0x1, 0x43, 0x91, 0x92, 0xbd, 0xf3, 0x7c, 0x11, 0xe4, 0xbb, 0xeb, 0x2, 0x42, 0xac, 0x11, 0x3, 0x5a, 0x4, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0 /* lt_type: */, 0x0 /* last_committed: */, 0x7, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0 /* sequence_number: */, 0x9, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x48, 0x45, 0x82, 0x27})
mysql56QueryEvent = NewMysql56BinlogEvent([]byte{0xff, 0x4e, 0x49, 0x55, 0x2, 0x64, 0x0, 0x0, 0x0, 0x77, 0x0, 0x0, 0x0, 0xdb, 0x3, 0x0, 0x0, 0x0, 0x0, 0x3d, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x4, 0x0, 0x0, 0x21, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x20, 0x0, 0x0, 0x0, 0x0, 0x0, 0x6, 0x3, 0x73, 0x74, 0x64, 0x4, 0x8, 0x0, 0x8, 0x0, 0x21, 0x0, 0xc, 0x1, 0x74, 0x65, 0x73, 0x74, 0x0, 0x74, 0x65, 0x73, 0x74, 0x0, 0x69, 0x6e, 0x73, 0x65, 0x72, 0x74, 0x20, 0x69, 0x6e, 0x74, 0x6f, 0x20, 0x74, 0x65, 0x73, 0x74, 0x5f, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x20, 0x28, 0x6d, 0x73, 0x67, 0x29, 0x20, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x73, 0x20, 0x28, 0x27, 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x27, 0x29, 0x92, 0x12, 0x79, 0xc3})
mysql56SemiSyncNoAckQueryEvent = NewMysql56BinlogEvent([]byte{0xef, 0x00, 0xff, 0x4e, 0x49, 0x55, 0x2, 0x64, 0x0, 0x0, 0x0, 0x77, 0x0, 0x0, 0x0, 0xdb, 0x3, 0x0, 0x0, 0x0, 0x0, 0x3d, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x4, 0x0, 0x0, 0x21, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x20, 0x0, 0x0, 0x0, 0x0, 0x0, 0x6, 0x3, 0x73, 0x74, 0x64, 0x4, 0x8, 0x0, 0x8, 0x0, 0x21, 0x0, 0xc, 0x1, 0x74, 0x65, 0x73, 0x74, 0x0, 0x74, 0x65, 0x73, 0x74, 0x0, 0x69, 0x6e, 0x73, 0x65, 0x72, 0x74, 0x20, 0x69, 0x6e, 0x74, 0x6f, 0x20, 0x74, 0x65, 0x73, 0x74, 0x5f, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x20, 0x28, 0x6d, 0x73, 0x67, 0x29, 0x20, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x73, 0x20, 0x28, 0x27, 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x27, 0x29, 0x92, 0x12, 0x79, 0xc3})
mysql56SemiSyncAckQueryEvent = NewMysql56BinlogEvent([]byte{0xef, 0x01, 0xff, 0x4e, 0x49, 0x55, 0x2, 0x64, 0x0, 0x0, 0x0, 0x77, 0x0, 0x0, 0x0, 0xdb, 0x3, 0x0, 0x0, 0x0, 0x0, 0x3d, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x4, 0x0, 0x0, 0x21, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x20, 0x0, 0x0, 0x0, 0x0, 0x0, 0x6, 0x3, 0x73, 0x74, 0x64, 0x4, 0x8, 0x0, 0x8, 0x0, 0x21, 0x0, 0xc, 0x1, 0x74, 0x65, 0x73, 0x74, 0x0, 0x74, 0x65, 0x73, 0x74, 0x0, 0x69, 0x6e, 0x73, 0x65, 0x72, 0x74, 0x20, 0x69, 0x6e, 0x74, 0x6f, 0x20, 0x74, 0x65, 0x73, 0x74, 0x5f, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x20, 0x28, 0x6d, 0x73, 0x67, 0x29, 0x20, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x73, 0x20, 0x28, 0x27, 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x27, 0x29, 0x92, 0x12, 0x79, 0xc3})
Expand Down Expand Up @@ -90,10 +90,12 @@ func TestMysql56GTID(t *testing.T) {
Server: replication.SID{0x43, 0x91, 0x92, 0xbd, 0xf3, 0x7c, 0x11, 0xe4, 0xbb, 0xeb, 0x2, 0x42, 0xac, 0x11, 0x3, 0x5a},
Sequence: 4,
}
got, hasBegin, err := input.GTID(format)
got, hasBegin, lastCommitted, sequenceNumber, err := input.GTID(format)
require.NoError(t, err, "GTID() error: %v", err)
assert.False(t, hasBegin, "GTID() returned hasBegin")
assert.Equal(t, want, got, "GTID() = %#v, want %#v", got, want)
assert.Equal(t, int64(7), lastCommitted)
assert.Equal(t, int64(9), sequenceNumber)
}

func TestMysql56DecodeTransactionPayload(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion go/mysql/endtoend/replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ func TestRowReplicationWithRealDatabase(t *testing.T) {
switch {
case be.IsGTID():
// We expect one of these at least.
gtid, hasBegin, err := be.GTID(f)
gtid, hasBegin, _, _, err := be.GTID(f)
if err != nil {
t.Fatalf("GTID event is broken: %v", err)
}
Expand Down
3 changes: 1 addition & 2 deletions go/mysql/replication/mysql56_gtid.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,7 @@ func ParseSID(s string) (sid SID, err error) {
type Mysql56GTID struct {
// Server is the SID of the server that originally committed the transaction.
Server SID
// Sequence is the sequence number of the transaction within a given Server's
// scope.
// Sequence is the sequence number of the transaction within a given Server's scope.
Sequence int64
}

Expand Down
4 changes: 2 additions & 2 deletions go/test/endtoend/onlineddl/revert/onlineddl_revert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"flag"
"fmt"
"math/rand/v2"
"net/http"
"os"
"path"
"strings"
Expand All @@ -33,6 +32,7 @@ import (
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/mysql/capabilities"
"vitess.io/vitess/go/vt/log"
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
"vitess.io/vitess/go/vt/schema"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp"

Expand Down Expand Up @@ -207,7 +207,7 @@ func TestRevertSchemaChanges(t *testing.T) {
require.Equal(t, 1, len(shards))

throttler.EnableLagThrottlerAndWaitForStatus(t, clusterInstance)
throttler.WaitForCheckThrottlerResult(t, clusterInstance, primaryTablet, throttlerapp.TestingName, nil, http.StatusOK, time.Minute)
throttler.WaitForCheckThrottlerResult(t, clusterInstance, primaryTablet, throttlerapp.TestingName, nil, tabletmanagerdatapb.CheckThrottlerResponseCode_OK, time.Minute)

t.Run("revertible", testRevertible)
t.Run("revert", testRevert)
Expand Down
Loading
Loading