From 979c1e64d0680420dfd7bf20413a7bf752d9310d Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Wed, 10 Jul 2024 12:19:58 -0400 Subject: [PATCH] VReplication: Handle large binlog compressed transactions more efficiently (#16328) Signed-off-by: Matt Lord --- go/mysql/binlog_event.go | 9 +- go/mysql/binlog_event_common.go | 2 - go/mysql/binlog_event_compression.go | 283 ++++++++++++------ go/mysql/binlog_event_filepos.go | 4 +- go/mysql/binlog_event_mysql56_test.go | 138 ++++++--- go/mysql/large_compressed_trx_payload.bin | Bin 0 -> 16090 bytes .../tabletserver/vstreamer/vstreamer.go | 12 +- 7 files changed, 309 insertions(+), 139 deletions(-) create mode 100644 go/mysql/large_compressed_trx_payload.bin diff --git a/go/mysql/binlog_event.go b/go/mysql/binlog_event.go index 3acf99c2408..84f92c3809d 100644 --- a/go/mysql/binlog_event.go +++ b/go/mysql/binlog_event.go @@ -124,9 +124,12 @@ type BinlogEvent interface { // IsWriteRows(), IsUpdateRows(), or IsDeleteRows() returns // true. Rows(BinlogFormat, *TableMap) (Rows, error) - // TransactionPayload returns a list of BinlogEvents contained - // within the compressed transaction. - TransactionPayload(BinlogFormat) ([]BinlogEvent, error) + // TransactionPayload returns a TransactionPayload type which provides + // a GetNextEvent() method to iterate over the events contained within + // the uncompressed payload. You must call Close() when you are done + // with the TransactionPayload to ensure that the underlying resources + // used are cleaned up. + TransactionPayload(BinlogFormat) (*TransactionPayload, error) // NextLogFile returns the name of the next binary log file & pos. // This is only valid if IsRotate() returns true NextLogFile(BinlogFormat) (string, uint64, error) diff --git a/go/mysql/binlog_event_common.go b/go/mysql/binlog_event_common.go index f95ed847e0a..c95873614f0 100644 --- a/go/mysql/binlog_event_common.go +++ b/go/mysql/binlog_event_common.go @@ -55,8 +55,6 @@ const ( BinlogFixedHeaderLen = 19 // The offset from 0 where the type is stored as 1 byte. BinlogEventTypeOffset = 4 - // Offset from 0 where the 4 byte length is stored. - BinlogEventLenOffset = 9 // Byte length of the checksum suffix when the CRC32 algorithm is used. BinlogCRC32ChecksumLen = 4 ) diff --git a/go/mysql/binlog_event_compression.go b/go/mysql/binlog_event_compression.go index 325bfeb4827..378698bc64b 100644 --- a/go/mysql/binlog_event_compression.go +++ b/go/mysql/binlog_event_compression.go @@ -19,13 +19,16 @@ package mysql import ( "bytes" "encoding/binary" - "fmt" "io" + "sync" "github.com/klauspost/compress/zstd" - vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" + "vitess.io/vitess/go/stats" + "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/vterrors" + + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) // This file contains code related to handling compression related @@ -41,35 +44,78 @@ const ( payloadUncompressedSizeField ) -// Compression algorithms that are supported (only zstd today -// in MySQL 8.0): -// https://dev.mysql.com/doc/refman/8.0/en/binary-log-transaction-compression.html const ( + // Compression algorithms that are supported (only zstd today + // in MySQL 8.0): + // https://dev.mysql.com/doc/refman/8.0/en/binary-log-transaction-compression.html TransactionPayloadCompressionZstd = 0 TransactionPayloadCompressionNone = 255 + + // Bytes used to store the internal event length as a uint32 at + // the end of the binlog event header. + eventLenBytes = 4 + // Offset from 0 where the eventLenBytes are stored. + binlogEventLenOffset = 9 + // Length of the binlog event header for internal events within + // the transaction payload. + headerLen = binlogEventLenOffset + eventLenBytes + + // At what size should we switch from the in-memory buffer + // decoding to streaming mode which is much slower, but does + // not require everything be done in memory. + zstdInMemoryDecompressorMaxSize = 128 << (10 * 2) // 128MiB ) -var TransactionPayloadCompressionTypes = map[uint64]string{ - TransactionPayloadCompressionZstd: "ZSTD", - TransactionPayloadCompressionNone: "NONE", -} +var ( + TransactionPayloadCompressionTypes = map[uint64]string{ + TransactionPayloadCompressionZstd: "ZSTD", + TransactionPayloadCompressionNone: "NONE", + } -// Create a reader that caches decompressors. This is used for -// smaller events that we want to handle entirely using in-memory -// buffers. -var zstdDecoder, _ = zstd.NewReader(nil, zstd.WithDecoderConcurrency(0)) + // Metrics. + compressedTrxPayloadsInMem = stats.NewCounter("CompressedTransactionPayloadsInMemory", "The number of compressed binlog transaction payloads that were processed in memory") + compressedTrxPayloadsUsingStream = stats.NewCounter("CompressedTransactionPayloadsViaStream", "The number of compressed binlog transaction payloads that were processed using a stream") + + // A concurrent stateless decoder that caches decompressors. This is + // used for smaller payloads that we want to handle entirely using + // in-memory buffers via DecodeAll. + statelessDecoder *zstd.Decoder + + // A pool of stateful decoders for larger payloads that we want to + // stream. The number of large (> zstdInMemoryDecompressorMaxSize) + // payloads should typically be relatively low, but there may be times + // where there are many of them -- and users like vstreamer may have + // N concurrent streams per tablet which could lead to a lot of + // allocations and GC overhead so this pool allows us to handle + // concurrent cases better while still scaling to 0 when there's no + // usage. + statefulDecoderPool sync.Pool +) -// At what size should we switch from the in-memory buffer -// decoding to streaming mode -- which is slower, but does not -// require everything be done in memory. -const zstdInMemoryDecompressorMaxSize = 128 << (10 * 2) // 128MiB +func init() { + var err error + statelessDecoder, err = zstd.NewReader(nil, zstd.WithDecoderConcurrency(0)) + if err != nil { // Should only happen e.g. due to ENOMEM + log.Errorf("Error creating stateless decoder: %v", err) + } + statefulDecoderPool = sync.Pool{ + New: func() any { + d, err := zstd.NewReader(nil, zstd.WithDecoderMaxMemory(zstdInMemoryDecompressorMaxSize)) + if err != nil { // Should only happen e.g. due to ENOMEM + log.Errorf("Error creating stateful decoder: %v", err) + } + return d + }, + } +} type TransactionPayload struct { - Size uint64 - CompressionType uint64 - UncompressedSize uint64 - Payload []byte - Events []BinlogEvent + size uint64 + compressionType uint64 + uncompressedSize uint64 + payload []byte + reader io.Reader + iterator func() (BinlogEvent, error) } // IsTransactionPayload returns true if a compressed transaction @@ -78,8 +124,12 @@ func (ev binlogEvent) IsTransactionPayload() bool { return ev.Type() == eTransactionPayloadEvent } -// TransactionPayload returns the BinlogEvents contained within -// the compressed transaction. +// TransactionPayload processes the payload and provides a GetNextEvent() +// method which should be used in a loop to read BinlogEvents one by one +// that were within the compressed transaction. That function will return +// io.EOF when there are no more events left in the payload. You must +// call Close() when you are done with the TransactionPayload to ensure +// that the underlying reader and related resources are cleaned up. // The following event types are compressed as part of the // transaction payload: // @@ -129,16 +179,18 @@ func (ev binlogEvent) IsTransactionPayload() bool { // We need to extract the compressed transaction payload from the GTID // event, decompress it with zstd, and then process the internal events // (e.g. Query and Row events) that make up the transaction. -func (ev binlogEvent) TransactionPayload(format BinlogFormat) ([]BinlogEvent, error) { +func (ev binlogEvent) TransactionPayload(format BinlogFormat) (*TransactionPayload, error) { tp := &TransactionPayload{} - if err := tp.Decode(ev.Bytes()[format.HeaderLength:]); err != nil { - return nil, vterrors.Wrapf(err, "error decoding transaction payload event") + if err := tp.process(ev.Bytes()[format.HeaderLength:]); err != nil { + return nil, vterrors.Wrap(err, "error decoding transaction payload event") } - return tp.Events, nil + return tp, nil } -// Decode decodes and decompresses the payload. -func (tp *TransactionPayload) Decode(data []byte) error { +// process reads and decompresses the payload, setting up the iterator +// that can then be used in GetNextEvent() to read the binlog events +// from the uncompressed payload one at a time. +func (tp *TransactionPayload) process(data []byte) error { if err := tp.read(data); err != nil { return err } @@ -147,7 +199,8 @@ func (tp *TransactionPayload) Decode(data []byte) error { // read unmarshalls the transaction payload event into the // TransactionPayload struct. The compressed payload itself will still -// need to be decoded -- meaning decompressing it and extracting the +// need to be decoded -- meaning decompressing it and setting up the +// iterator that can then be used by GetNextEvent() to extract the // internal events. func (tp *TransactionPayload) read(data []byte) error { pos := uint64(0) @@ -160,7 +213,7 @@ func (tp *TransactionPayload) read(data []byte) error { pos++ if fieldType == payloadHeaderEndMark { - tp.Payload = data[pos:] + tp.payload = data[pos:] return nil // we're done } @@ -172,17 +225,17 @@ func (tp *TransactionPayload) read(data []byte) error { switch fieldType { case payloadSizeField: - tp.Size, ok = readFixedLenUint64(data[pos : pos+fieldLen]) + tp.size, ok = readFixedLenUint64(data[pos : pos+fieldLen]) if !ok { return vterrors.New(vtrpcpb.Code_INTERNAL, "error reading payload size") } case payloadCompressionTypeField: - tp.CompressionType, ok = readFixedLenUint64(data[pos : pos+fieldLen]) + tp.compressionType, ok = readFixedLenUint64(data[pos : pos+fieldLen]) if !ok { return vterrors.New(vtrpcpb.Code_INTERNAL, "error reading compression type") } case payloadUncompressedSizeField: - tp.UncompressedSize, ok = readFixedLenUint64(data[pos : pos+fieldLen]) + tp.uncompressedSize, ok = readFixedLenUint64(data[pos : pos+fieldLen]) if !ok { return vterrors.New(vtrpcpb.Code_INTERNAL, "error reading uncompressed payload size") } @@ -192,78 +245,126 @@ func (tp *TransactionPayload) read(data []byte) error { } } -// decode decompresses the payload and extracts the internal binlog -// events. +// decode decompresses the payload and assigns the iterator to a +// function that can then be used to retrieve the events from the +// uncompressed transaction one at a time. func (tp *TransactionPayload) decode() error { - if tp.CompressionType != TransactionPayloadCompressionZstd { - return vterrors.New(vtrpcpb.Code_INTERNAL, - fmt.Sprintf("TransactionPayload has unsupported compression type of %d", tp.CompressionType)) + if tp.compressionType != TransactionPayloadCompressionZstd { + return vterrors.Errorf(vtrpcpb.Code_INTERNAL, + "TransactionPayload has unsupported compression type of %d", tp.compressionType) } - decompressedPayload, err := tp.decompress() - decompressedPayloadLen := uint64(len(decompressedPayload)) - if err != nil { - return vterrors.Wrapf(err, "error decompressing transaction payload") + err := tp.decompress() + if err != nil || tp.reader == nil { + return vterrors.Wrap(err, "error decompressing transaction payload") } - pos := uint64(0) - - for { - eventLenPosEnd := pos + BinlogEventLenOffset + 4 - if eventLenPosEnd > decompressedPayloadLen { // No more events in the payload - break + header := make([]byte, headerLen) + tp.iterator = func() (ble BinlogEvent, err error) { + bytesRead, err := io.ReadFull(tp.reader, header) + if err != nil { + if err == io.EOF { + return nil, io.EOF + } + return nil, vterrors.Wrap(err, "error reading event header from uncompressed transaction payload") } - eventLen := uint64(binary.LittleEndian.Uint32(decompressedPayload[pos+BinlogEventLenOffset : eventLenPosEnd])) - if pos+eventLen > decompressedPayloadLen { - return vterrors.New(vtrpcpb.Code_INTERNAL, - fmt.Sprintf("[BUG] event length of %d at pos %d in decompressed transaction payload is beyond the expected payload length of %d", - eventLen, pos, decompressedPayloadLen)) + if bytesRead != headerLen { + return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "[BUG] expected header length of %d but only read %d bytes", + headerLen, bytesRead) } - eventData := decompressedPayload[pos : pos+eventLen] - ble := NewMysql56BinlogEvent(eventData) - tp.Events = append(tp.Events, ble) - - pos += eventLen + eventLen := int64(binary.LittleEndian.Uint32(header[binlogEventLenOffset:headerLen])) + eventData := make([]byte, eventLen) + copy(eventData, header) // The event includes the header + bytesRead, err = io.ReadFull(tp.reader, eventData[headerLen:]) + if err != nil && err != io.EOF { + return nil, vterrors.Wrap(err, "error reading binlog event data from uncompressed transaction payload") + } + if int64(bytesRead+headerLen) != eventLen { + return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "[BUG] expected binlog event length of %d but only read %d bytes", + eventLen, bytesRead) + } + return NewMysql56BinlogEvent(eventData), nil } - return nil } -// Decompress the payload. -func (tp *TransactionPayload) decompress() ([]byte, error) { - if len(tp.Payload) == 0 { - return []byte{}, vterrors.New(vtrpcpb.Code_INVALID_ARGUMENT, "cannot decompress empty payload") +// decompress decompresses the payload. If the payload is larger than +// zstdInMemoryDecompressorMaxSize then we stream the decompression via +// the package's pool of zstd.Decoders, otherwise we use in-memory +// buffers with the package's concurrent statelessDecoder. +// In either case, we setup the reader that can be used within the +// iterator to read the events one at a time from the decompressed +// payload in GetNextEvent(). +func (tp *TransactionPayload) decompress() error { + if len(tp.payload) == 0 { + return vterrors.New(vtrpcpb.Code_INVALID_ARGUMENT, "cannot decompress empty compressed transaction payload") } - var ( - decompressedBytes []byte - err error - ) - - // Switch to slower but less memory intensive stream mode for larger payloads. - if tp.UncompressedSize > zstdInMemoryDecompressorMaxSize { - in := bytes.NewReader(tp.Payload) - streamDecoder, err := zstd.NewReader(in) - if err != nil { - return nil, err - } - defer streamDecoder.Close() - out := io.Writer(&bytes.Buffer{}) - _, err = io.Copy(out, streamDecoder) - if err != nil { - return nil, err + + // Switch to slower but less memory intensive stream mode for + // larger payloads. + if tp.uncompressedSize > zstdInMemoryDecompressorMaxSize { + in := bytes.NewReader(tp.payload) + streamDecoder := statefulDecoderPool.Get().(*zstd.Decoder) + if streamDecoder == nil { + return vterrors.New(vtrpcpb.Code_INTERNAL, "failed to create stateful stream decoder") } - decompressedBytes = out.(*bytes.Buffer).Bytes() - } else { // Process smaller payloads using in-memory buffers. - decompressedBytes, err = zstdDecoder.DecodeAll(tp.Payload, nil) - if err != nil { - return nil, err + if err := streamDecoder.Reset(in); err != nil { + return vterrors.Wrap(err, "error resetting stateful stream decoder") } + compressedTrxPayloadsUsingStream.Add(1) + tp.reader = streamDecoder + return nil } - if uint64(len(decompressedBytes)) != tp.UncompressedSize { - return []byte{}, vterrors.New(vtrpcpb.Code_INVALID_ARGUMENT, - fmt.Sprintf("decompressed size %d does not match expected size %d", len(decompressedBytes), tp.UncompressedSize)) + // Process smaller payloads using only in-memory buffers. + if statelessDecoder == nil { // Should never happen + return vterrors.New(vtrpcpb.Code_INTERNAL, "failed to create stateless decoder") + } + decompressedBytes := make([]byte, 0, tp.uncompressedSize) // Perform a single pre-allocation + decompressedBytes, err := statelessDecoder.DecodeAll(tp.payload, decompressedBytes[:0]) + if err != nil { + return err } + if uint64(len(decompressedBytes)) != tp.uncompressedSize { + return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, + "uncompressed transaction payload size %d does not match expected size %d", len(decompressedBytes), tp.uncompressedSize) + } + compressedTrxPayloadsInMem.Add(1) + tp.reader = bytes.NewReader(decompressedBytes) + return nil +} - return decompressedBytes, nil +// Close should be called in a defer where the TransactionPayload is +// used to ensure that the underlying reader and related resources +// used are cleaned up. +func (tp *TransactionPayload) Close() { + switch reader := tp.reader.(type) { + case *zstd.Decoder: + if err := reader.Reset(nil); err == nil || err == io.EOF { + readersPool.Put(reader) + } + default: + reader = nil + } + tp.iterator = nil } + +// GetNextEvent returns the next binlog event that was contained within +// the compressed transaction payload. It will return io.EOF when there +// are no more events left in the payload. +func (tp *TransactionPayload) GetNextEvent() (BinlogEvent, error) { + if tp == nil || tp.iterator == nil { + return nil, vterrors.New(vtrpcpb.Code_INVALID_ARGUMENT, "TransactionPayload has been closed") + } + return tp.iterator() +} + +// Events returns an iterator over the internal binlog events that +// were contained within the compressed transaction payload/event. +// It returns a single-use iterator. +// TODO(mattlord): implement this when main is on go 1.23. See: +// - https://tip.golang.org/wiki/RangefuncExperiment +// - https://github.com/golang/go/blob/release-branch.go1.23/src/iter/iter.go +//func (tp *TransactionPayload) Events() iter.Seq[BinlogEvent] { +// return tp.iterator +//} diff --git a/go/mysql/binlog_event_filepos.go b/go/mysql/binlog_event_filepos.go index 4edc4bb91ff..8a2976da80d 100644 --- a/go/mysql/binlog_event_filepos.go +++ b/go/mysql/binlog_event_filepos.go @@ -247,8 +247,8 @@ func (ev filePosFakeEvent) Rows(BinlogFormat, *TableMap) (Rows, error) { return Rows{}, nil } -func (ev filePosFakeEvent) TransactionPayload(BinlogFormat) ([]BinlogEvent, error) { - return []BinlogEvent{}, nil +func (ev filePosFakeEvent) TransactionPayload(BinlogFormat) (*TransactionPayload, error) { + return &TransactionPayload{}, nil } func (ev filePosFakeEvent) NextLogFile(BinlogFormat) (string, uint64, error) { diff --git a/go/mysql/binlog_event_mysql56_test.go b/go/mysql/binlog_event_mysql56_test.go index e5fa3545278..f173e27e4af 100644 --- a/go/mysql/binlog_event_mysql56_test.go +++ b/go/mysql/binlog_event_mysql56_test.go @@ -17,8 +17,11 @@ limitations under the License. package mysql import ( + _ "embed" "fmt" + "io" "reflect" + "strings" "testing" "github.com/stretchr/testify/assert" @@ -29,15 +32,16 @@ import ( // Sample event data for MySQL 5.6. var ( - mysql56FormatEvent = NewMysql56BinlogEvent([]byte{0x78, 0x4e, 0x49, 0x55, 0xf, 0x64, 0x0, 0x0, 0x0, 0x74, 0x0, 0x0, 0x0, 0x78, 0x0, 0x0, 0x0, 0x1, 0x0, 0x4, 0x0, 0x35, 0x2e, 0x36, 0x2e, 0x32, 0x34, 0x2d, 0x6c, 0x6f, 0x67, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x78, 0x4e, 0x49, 0x55, 0x13, 0x38, 0xd, 0x0, 0x8, 0x0, 0x12, 0x0, 0x4, 0x4, 0x4, 0x4, 0x12, 0x0, 0x0, 0x5c, 0x0, 0x4, 0x1a, 0x8, 0x0, 0x0, 0x0, 0x8, 0x8, 0x8, 0x2, 0x0, 0x0, 0x0, 0xa, 0xa, 0xa, 0x19, 0x19, 0x0, 0x1, 0x18, 0x4a, 0xf, 0xca}) - mysql56GTIDEvent = NewMysql56BinlogEvent([]byte{0xff, 0x4e, 0x49, 0x55, 0x21, 0x64, 0x0, 0x0, 0x0, 0x30, 0x0, 0x0, 0x0, 0xf5, 0x2, 0x0, 0x0, 0x0, 0x0, 0x1, 0x43, 0x91, 0x92, 0xbd, 0xf3, 0x7c, 0x11, 0xe4, 0xbb, 0xeb, 0x2, 0x42, 0xac, 0x11, 0x3, 0x5a, 0x4, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x48, 0x45, 0x82, 0x27}) - // This is the result of: begin; insert into customer values (1, "mlord@planetscale.com"), (2, "sup@planetscale.com"); commit; - mysql56TransactionPayloadEvent = NewMysql56BinlogEvent([]byte{0xc7, 0xe1, 0x4b, 0x64, 0x28, 0x5b, 0xd2, 0xc7, 0x19, 0xdb, 0x00, 0x00, 0x00, 0x3a, 0x50, 0x00, 0x00, 0x00, 0x00, 0x02, 0x01, 0x00, 0x03, 0x03, 0xfc, 0xfe, 0x00, 0x01, 0x01, 0xb8, 0x00, 0x28, 0xb5, 0x2f, 0xfd, 0x00, 0x58, 0x64, 0x05, 0x00, 0xf2, 0x49, 0x23, 0x2a, 0xa0, 0x27, 0x69, 0x0c, 0xff, 0xe8, 0x06, 0xeb, 0xfe, 0xc3, 0xab, 0x8a, 0x7b, 0xc0, 0x36, 0x42, 0x5c, 0x6f, 0x1b, 0x2f, 0xfb, 0x6e, 0xc4, 0x9a, 0xe6, 0x6e, 0x6b, 0xda, 0x08, 0xf1, 0x37, 0x7e, 0xff, 0xb8, 0x6c, 0xbc, 0x27, 0x3c, 0xb7, 0x4f, 0xee, 0x14, 0xff, 0xaf, 0x09, 0x06, 0x69, 0xe3, 0x12, 0x68, 0x4a, 0x6e, 0xc3, 0xe1, 0x28, 0xaf, 0x3f, 0xc8, 0x14, 0x1c, 0xc3, 0x60, 0xce, 0xe3, 0x1e, 0x18, 0x4c, 0x63, 0xa1, 0x35, 0x90, 0x79, 0x04, 0xe8, 0xa9, 0xeb, 0x4a, 0x1b, 0xd7, 0x41, 0x53, 0x72, 0x17, 0xa4, 0x23, 0xa4, 0x47, 0x68, 0x00, 0xa2, 0x37, 0xee, 0xc1, 0xc7, 0x71, 0x30, 0x24, 0x19, 0xfd, 0x78, 0x49, 0x1b, 0x97, 0xd2, 0x94, 0xdc, 0x85, 0xa2, 0x21, 0xc1, 0xb0, 0x63, 0x8d, 0x7b, 0x0f, 0x32, 0x87, 0x07, 0xe2, 0x39, 0xf0, 0x7c, 0x3e, 0x01, 0xfe, 0x13, 0x8f, 0x11, 0xd0, 0x05, 0x9f, 0xbc, 0x18, 0x59, 0x91, 0x36, 0x2e, 0x6d, 0x4a, 0x6e, 0x0b, 0x00, 0x5e, 0x28, 0x10, 0xc0, 0x02, 0x50, 0x77, 0xe0, 0x64, 0x30, 0x02, 0x9e, 0x09, 0x54, 0xec, 0x80, 0x6d, 0x07, 0xa4, 0xc1, 0x7d, 0x60, 0xe4, 0x01, 0x78, 0x01, 0x01, 0x00, 0x00}) + mysql56FormatEvent = NewMysql56BinlogEvent([]byte{0x78, 0x4e, 0x49, 0x55, 0xf, 0x64, 0x0, 0x0, 0x0, 0x74, 0x0, 0x0, 0x0, 0x78, 0x0, 0x0, 0x0, 0x1, 0x0, 0x4, 0x0, 0x35, 0x2e, 0x36, 0x2e, 0x32, 0x34, 0x2d, 0x6c, 0x6f, 0x67, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x78, 0x4e, 0x49, 0x55, 0x13, 0x38, 0xd, 0x0, 0x8, 0x0, 0x12, 0x0, 0x4, 0x4, 0x4, 0x4, 0x12, 0x0, 0x0, 0x5c, 0x0, 0x4, 0x1a, 0x8, 0x0, 0x0, 0x0, 0x8, 0x8, 0x8, 0x2, 0x0, 0x0, 0x0, 0xa, 0xa, 0xa, 0x19, 0x19, 0x0, 0x1, 0x18, 0x4a, 0xf, 0xca}) + mysql56GTIDEvent = NewMysql56BinlogEvent([]byte{0xff, 0x4e, 0x49, 0x55, 0x21, 0x64, 0x0, 0x0, 0x0, 0x30, 0x0, 0x0, 0x0, 0xf5, 0x2, 0x0, 0x0, 0x0, 0x0, 0x1, 0x43, 0x91, 0x92, 0xbd, 0xf3, 0x7c, 0x11, 0xe4, 0xbb, 0xeb, 0x2, 0x42, 0xac, 0x11, 0x3, 0x5a, 0x4, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x48, 0x45, 0x82, 0x27}) mysql56QueryEvent = NewMysql56BinlogEvent([]byte{0xff, 0x4e, 0x49, 0x55, 0x2, 0x64, 0x0, 0x0, 0x0, 0x77, 0x0, 0x0, 0x0, 0xdb, 0x3, 0x0, 0x0, 0x0, 0x0, 0x3d, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x4, 0x0, 0x0, 0x21, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x20, 0x0, 0x0, 0x0, 0x0, 0x0, 0x6, 0x3, 0x73, 0x74, 0x64, 0x4, 0x8, 0x0, 0x8, 0x0, 0x21, 0x0, 0xc, 0x1, 0x74, 0x65, 0x73, 0x74, 0x0, 0x74, 0x65, 0x73, 0x74, 0x0, 0x69, 0x6e, 0x73, 0x65, 0x72, 0x74, 0x20, 0x69, 0x6e, 0x74, 0x6f, 0x20, 0x74, 0x65, 0x73, 0x74, 0x5f, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x20, 0x28, 0x6d, 0x73, 0x67, 0x29, 0x20, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x73, 0x20, 0x28, 0x27, 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x27, 0x29, 0x92, 0x12, 0x79, 0xc3}) mysql56SemiSyncNoAckQueryEvent = NewMysql56BinlogEvent([]byte{0xef, 0x00, 0xff, 0x4e, 0x49, 0x55, 0x2, 0x64, 0x0, 0x0, 0x0, 0x77, 0x0, 0x0, 0x0, 0xdb, 0x3, 0x0, 0x0, 0x0, 0x0, 0x3d, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x4, 0x0, 0x0, 0x21, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x20, 0x0, 0x0, 0x0, 0x0, 0x0, 0x6, 0x3, 0x73, 0x74, 0x64, 0x4, 0x8, 0x0, 0x8, 0x0, 0x21, 0x0, 0xc, 0x1, 0x74, 0x65, 0x73, 0x74, 0x0, 0x74, 0x65, 0x73, 0x74, 0x0, 0x69, 0x6e, 0x73, 0x65, 0x72, 0x74, 0x20, 0x69, 0x6e, 0x74, 0x6f, 0x20, 0x74, 0x65, 0x73, 0x74, 0x5f, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x20, 0x28, 0x6d, 0x73, 0x67, 0x29, 0x20, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x73, 0x20, 0x28, 0x27, 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x27, 0x29, 0x92, 0x12, 0x79, 0xc3}) mysql56SemiSyncAckQueryEvent = NewMysql56BinlogEvent([]byte{0xef, 0x01, 0xff, 0x4e, 0x49, 0x55, 0x2, 0x64, 0x0, 0x0, 0x0, 0x77, 0x0, 0x0, 0x0, 0xdb, 0x3, 0x0, 0x0, 0x0, 0x0, 0x3d, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x4, 0x0, 0x0, 0x21, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x20, 0x0, 0x0, 0x0, 0x0, 0x0, 0x6, 0x3, 0x73, 0x74, 0x64, 0x4, 0x8, 0x0, 0x8, 0x0, 0x21, 0x0, 0xc, 0x1, 0x74, 0x65, 0x73, 0x74, 0x0, 0x74, 0x65, 0x73, 0x74, 0x0, 0x69, 0x6e, 0x73, 0x65, 0x72, 0x74, 0x20, 0x69, 0x6e, 0x74, 0x6f, 0x20, 0x74, 0x65, 0x73, 0x74, 0x5f, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x20, 0x28, 0x6d, 0x73, 0x67, 0x29, 0x20, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x73, 0x20, 0x28, 0x27, 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x27, 0x29, 0x92, 0x12, 0x79, 0xc3}) ) +//go:embed large_compressed_trx_payload.bin +var mysql56CompressedLargeTrxPayload []byte + func TestMysql56IsGTID(t *testing.T) { if got, want := mysql56FormatEvent.IsGTID(), false; got != want { t.Errorf("%#v.IsGTID() = %#v, want %#v", mysql56FormatEvent, got, want) @@ -94,46 +98,102 @@ func TestMysql56GTID(t *testing.T) { func TestMysql56DecodeTransactionPayload(t *testing.T) { format := NewMySQL56BinlogFormat() tableMap := &TableMap{} - require.True(t, mysql56TransactionPayloadEvent.IsTransactionPayload()) - - // The generated event is the result of the following SQL being executed in vtgate - // against the commerce keyspace: - // begin; insert into customer values (1, "mlord@planetscale.com"), (2, "sup@planetscale.com"); commit; - // All of these below internal events are encoded in the compressed transaction - // payload event. - want := []string{ - "BEGIN", // Query event - "vt_commerce.customer", // TableMap event - "[1 mlord@planetscale.com]", // WriteRows event - "[2 sup@planetscale.com]", // WriteRows event - "COMMIT", // XID event + + testCases := []struct { + name string + event BinlogEvent + want []string + inMemory bool + }{ + { + name: "Small event done in memory", + event: NewMysql56BinlogEvent([]byte{0xc7, 0xe1, 0x4b, 0x64, 0x28, 0x5b, 0xd2, 0xc7, 0x19, 0xdb, 0x00, 0x00, 0x00, 0x3a, 0x50, 0x00, 0x00, 0x00, 0x00, 0x02, 0x01, 0x00, 0x03, 0x03, 0xfc, 0xfe, 0x00, 0x01, 0x01, 0xb8, 0x00, 0x28, 0xb5, 0x2f, 0xfd, 0x00, 0x58, 0x64, 0x05, 0x00, 0xf2, 0x49, 0x23, 0x2a, 0xa0, 0x27, 0x69, 0x0c, 0xff, 0xe8, 0x06, 0xeb, 0xfe, 0xc3, 0xab, 0x8a, 0x7b, 0xc0, 0x36, 0x42, 0x5c, 0x6f, 0x1b, 0x2f, 0xfb, 0x6e, 0xc4, 0x9a, 0xe6, 0x6e, 0x6b, 0xda, 0x08, 0xf1, 0x37, 0x7e, 0xff, 0xb8, 0x6c, 0xbc, 0x27, 0x3c, 0xb7, 0x4f, 0xee, 0x14, 0xff, 0xaf, 0x09, 0x06, 0x69, 0xe3, 0x12, 0x68, 0x4a, 0x6e, 0xc3, 0xe1, 0x28, 0xaf, 0x3f, 0xc8, 0x14, 0x1c, 0xc3, 0x60, 0xce, 0xe3, 0x1e, 0x18, 0x4c, 0x63, 0xa1, 0x35, 0x90, 0x79, 0x04, 0xe8, 0xa9, 0xeb, 0x4a, 0x1b, 0xd7, 0x41, 0x53, 0x72, 0x17, 0xa4, 0x23, 0xa4, 0x47, 0x68, 0x00, 0xa2, 0x37, 0xee, 0xc1, 0xc7, 0x71, 0x30, 0x24, 0x19, 0xfd, 0x78, 0x49, 0x1b, 0x97, 0xd2, 0x94, 0xdc, 0x85, 0xa2, 0x21, 0xc1, 0xb0, 0x63, 0x8d, 0x7b, 0x0f, 0x32, 0x87, 0x07, 0xe2, 0x39, 0xf0, 0x7c, 0x3e, 0x01, 0xfe, 0x13, 0x8f, 0x11, 0xd0, 0x05, 0x9f, 0xbc, 0x18, 0x59, 0x91, 0x36, 0x2e, 0x6d, 0x4a, 0x6e, 0x0b, 0x00, 0x5e, 0x28, 0x10, 0xc0, 0x02, 0x50, 0x77, 0xe0, 0x64, 0x30, 0x02, 0x9e, 0x09, 0x54, 0xec, 0x80, 0x6d, 0x07, 0xa4, 0xc1, 0x7d, 0x60, 0xe4, 0x01, 0x78, 0x01, 0x01, 0x00, 0x00}), + // The generated event is the result of the following SQL being executed in vtgate + // against the commerce keyspace: + // begin; insert into customer values (1, "mlord@planetscale.com"), (2, "sup@planetscale.com"); commit; + // All of these below internal events are encoded in the compressed transaction + // payload event. + want: []string{ + "BEGIN", // Query event + "vt_commerce.customer", // TableMap event + "[1 mlord@planetscale.com]", // WriteRows event + "[2 sup@planetscale.com]", // WriteRows event + "COMMIT", // XID event + }, + inMemory: true, + }, + { + name: "Large event using streaming", + event: NewMysql56BinlogEvent(mysql56CompressedLargeTrxPayload), + // The generated event is the result of the following SQL being executed against the + // commerce keyspace after having added a LONGTEXT column to the customer + // table (this generates an uncompressed transaction that is over 128MiB): + // insert into customer values (1, "mlord@planetscale.com", repeat("test", 43280000)); + // All of these below internal events are encoded in the compressed transaction + // payload event. + want: []string{ + "BEGIN", // Query event + "vt_commerce.customer", // TableMap event + "[1 mlord@planetscale.com testtesttesttesttesttesttesttest", // WriteRows event + "COMMIT", // XID event + }, + inMemory: false, + }, } - internalEvents, err := mysql56TransactionPayloadEvent.TransactionPayload(format) - require.NoError(t, err) - eventStrs := []string{} - for _, ev := range internalEvents { - switch { - case ev.IsTableMap(): - tableMap, err = ev.TableMap(format) - require.NoError(t, err) - eventStrs = append(eventStrs, fmt.Sprintf("%s.%s", tableMap.Database, tableMap.Name)) - case ev.IsQuery(): - query, err := ev.Query(format) - require.NoError(t, err) - eventStrs = append(eventStrs, query.SQL) - case ev.IsWriteRows(): - rows, err := ev.Rows(format, tableMap) - require.NoError(t, err) - for i := range rows.Rows { - rowStr, err := rows.StringValuesForTests(tableMap, i) + + for _, tc := range testCases { + memDecodingCnt := compressedTrxPayloadsInMem.Get() + streamDecodingCnt := compressedTrxPayloadsUsingStream.Get() + + require.True(t, tc.event.IsTransactionPayload()) + tp, err := tc.event.TransactionPayload(format) + require.NoError(t, err) + defer tp.Close() + eventStrs := []string{} + for { + ev, err := tp.GetNextEvent() + if err != nil { + if err == io.EOF { + break + } + require.Fail(t, fmt.Sprintf("unexpected error: %v", err)) + } + switch { + case ev.IsTableMap(): + tableMap, err = ev.TableMap(format) require.NoError(t, err) - eventStrs = append(eventStrs, fmt.Sprintf("%v", rowStr)) + eventStrs = append(eventStrs, fmt.Sprintf("%s.%s", tableMap.Database, tableMap.Name)) + case ev.IsQuery(): + query, err := ev.Query(format) + require.NoError(t, err) + eventStrs = append(eventStrs, query.SQL) + case ev.IsWriteRows(): + rows, err := ev.Rows(format, tableMap) + require.NoError(t, err) + for i := range rows.Rows { + rowStr, err := rows.StringValuesForTests(tableMap, i) + require.NoError(t, err) + eventStrs = append(eventStrs, fmt.Sprintf("%v", rowStr)) + } + case ev.IsXID(): + eventStrs = append(eventStrs, "COMMIT") + } + } + if tc.inMemory { + require.Equal(t, memDecodingCnt+1, compressedTrxPayloadsInMem.Get()) + require.Equal(t, tc.want, eventStrs) + } else { + require.Equal(t, streamDecodingCnt+1, compressedTrxPayloadsUsingStream.Get()) + require.Len(t, eventStrs, len(tc.want)) + totalSize := 0 + for i, want := range tc.want { + eventStr := eventStrs[i] + totalSize += len(eventStr) + require.True(t, strings.HasPrefix(eventStr, want)) } - case ev.IsXID(): - eventStrs = append(eventStrs, "COMMIT") + require.Greater(t, totalSize, zstdInMemoryDecompressorMaxSize) } } - require.Equal(t, want, eventStrs) } func TestMysql56ParsePosition(t *testing.T) { diff --git a/go/mysql/large_compressed_trx_payload.bin b/go/mysql/large_compressed_trx_payload.bin new file mode 100644 index 0000000000000000000000000000000000000000..8d5d10a0d5090491e0ef1e211422931bec89c23b GIT binary patch literal 16090 zcmeI(&nts*90%~v^E~@$Vh7sHWm)u;pAjQkieiOQ+i-c3gB+%w+M{TZSfa#9;-D=K zLdm40WI2cfIVdGj3QY=&gL<~dp8vpg_5Pkd-|y-B>HWIC8DVaaKffRMKe&jj3zR-A zL!2@7vD%%b-WX2acM*S3nVjrZ-$Gg{$|^1s-8WWyQb`4sz!K#jnfOqh*Z(ctFLy=C zuS@H6{AkSFZrr;Rohz*5+G@J(KAP4$oblkSJv7zow(2K#hg-%fbiM*#Ro}H>F}4LQ z4QoTD$&JoqK0YsAB!n+^r@Es*8XmD(Z7;8zmUSt|^JEi`h1|KT8QGav_^vsT&lbff zx1C*6V!AUD|NHo?cf7?|=F=M<(EAY6CIdAGjDLwg3PC literal 0 HcmV?d00001 diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go index bf41111bbc8..3413c53d811 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go @@ -642,15 +642,23 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent) ([]*binlogdatapb.VEvent, e return nil, fmt.Errorf("compressed transaction payload events are not supported with database flavor %s", vs.vse.env.Config().DB.Flavor) } - tpevents, err := ev.TransactionPayload(vs.format) + tp, err := ev.TransactionPayload(vs.format) if err != nil { return nil, err } + defer tp.Close() // Events inside the payload don't have their own checksum. ogca := vs.format.ChecksumAlgorithm defer func() { vs.format.ChecksumAlgorithm = ogca }() vs.format.ChecksumAlgorithm = mysql.BinlogChecksumAlgOff - for _, tpevent := range tpevents { + for { + tpevent, err := tp.GetNextEvent() + if err != nil { + if err == io.EOF { + break + } + return nil, err + } tpvevents, err := vs.parseEvent(tpevent) if err != nil { return nil, vterrors.Wrap(err, "failed to parse transaction payload's internal event")