Skip to content

Commit

Permalink
Acquire schema engine mutex while marshaling schema
Browse files Browse the repository at this point in the history
This change is purely defensive, but it may help avoid future
race conditions caused by modifying shared schema structs while
they are being marshaled to protobuf.

Signed-off-by: Brendan Dougherty <[email protected]>
  • Loading branch information
brendar authored and austenLacy committed Jun 20, 2023
1 parent 14548d2 commit bf09bf7
Show file tree
Hide file tree
Showing 7 changed files with 117 additions and 77 deletions.
18 changes: 15 additions & 3 deletions go/vt/vtgate/endtoend/vstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"io"
"regexp"
"sync"
"testing"

Expand Down Expand Up @@ -330,9 +331,9 @@ func TestVStreamSharded(t *testing.T) {
received bool
}
expectedEvents := []*expectedEvent{
{`type:FIELD field_event:{table_name:"ks.t1_sharded" fields:{name:"id1" type:INT64 table:"t1_sharded" org_table:"t1_sharded" database:"vt_ks_-80" org_name:"id1" column_length:20 charset:63 flags:53251} fields:{name:"id2" type:INT64 table:"t1_sharded" org_table:"t1_sharded" database:"vt_ks_-80" org_name:"id2" column_length:20 charset:63 flags:32768} keyspace:"ks" shard:"-80"}`, false},
{`type:FIELD field_event:{table_name:"ks.t1_sharded" fields:{name:"id1" type:INT64 table:"t1_sharded" org_table:"t1_sharded" database:"vt_ks_-80" org_name:"id1" column_length:20 charset:63 flags:53251 column_type:"bigint(20)"} fields:{name:"id2" type:INT64 table:"t1_sharded" org_table:"t1_sharded" database:"vt_ks_-80" org_name:"id2" column_length:20 charset:63 flags:32768 column_type:"bigint(20)"} keyspace:"ks" shard:"-80"}`, false},
{`type:ROW row_event:{table_name:"ks.t1_sharded" row_changes:{after:{lengths:1 lengths:1 values:"11"}} keyspace:"ks" shard:"-80"}`, false},
{`type:FIELD field_event:{table_name:"ks.t1_sharded" fields:{name:"id1" type:INT64 table:"t1_sharded" org_table:"t1_sharded" database:"vt_ks_80-" org_name:"id1" column_length:20 charset:63 flags:53251} fields:{name:"id2" type:INT64 table:"t1_sharded" org_table:"t1_sharded" database:"vt_ks_80-" org_name:"id2" column_length:20 charset:63 flags:32768} keyspace:"ks" shard:"80-"}`, false},
{`type:FIELD field_event:{table_name:"ks.t1_sharded" fields:{name:"id1" type:INT64 table:"t1_sharded" org_table:"t1_sharded" database:"vt_ks_80-" org_name:"id1" column_length:20 charset:63 flags:53251 column_type:"bigint(20)"} fields:{name:"id2" type:INT64 table:"t1_sharded" org_table:"t1_sharded" database:"vt_ks_80-" org_name:"id2" column_length:20 charset:63 flags:32768 column_type:"bigint(20)"} keyspace:"ks" shard:"80-"}`, false},
{`type:ROW row_event:{table_name:"ks.t1_sharded" row_changes:{after:{lengths:1 lengths:1 values:"44"}} keyspace:"ks" shard:"80-"}`, false},
}
for {
Expand All @@ -357,7 +358,7 @@ func TestVStreamSharded(t *testing.T) {
for _, ev := range evs {
s := fmt.Sprintf("%v", ev)
for _, expectedEv := range expectedEvents {
if expectedEv.ev == s {
if removeAnyDeprecatedDisplayWidths(expectedEv.ev) == removeAnyDeprecatedDisplayWidths(s) {
expectedEv.received = true
break
}
Expand All @@ -381,6 +382,17 @@ func TestVStreamSharded(t *testing.T) {

}

func removeAnyDeprecatedDisplayWidths(orig string) string {
var adjusted string
baseIntType := "int"
intRE := regexp.MustCompile(`(?i)int\(([0-9]*)?\)`)
adjusted = intRE.ReplaceAllString(orig, baseIntType)
baseYearType := "year"
yearRE := regexp.MustCompile(`(?i)year\(([0-9]*)?\)`)
adjusted = yearRE.ReplaceAllString(adjusted, baseYearType)
return adjusted
}

var printMu sync.Mutex

func printEvents(evs []*binlogdatapb.VEvent) {
Expand Down
28 changes: 28 additions & 0 deletions go/vt/vttablet/tabletserver/schema/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"sync"
"time"

"google.golang.org/protobuf/proto"

"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/dbconnpool"
"vitess.io/vitess/go/vt/schema"
Expand Down Expand Up @@ -587,6 +589,32 @@ func (se *Engine) GetSchema() map[string]*Table {
return tables
}

// MarshalMinimalSchema returns a protobuf encoded binlogdata.MinimalSchema
func (se *Engine) MarshalMinimalSchema() ([]byte, error) {
se.mu.Lock()
defer se.mu.Unlock()
dbSchema := &binlogdatapb.MinimalSchema{
Tables: make([]*binlogdatapb.MinimalTable, 0, len(se.tables)),
}
for _, table := range se.tables {
dbSchema.Tables = append(dbSchema.Tables, newMinimalTable(table))
}
return proto.Marshal(dbSchema)
}

func newMinimalTable(st *Table) *binlogdatapb.MinimalTable {
table := &binlogdatapb.MinimalTable{
Name: st.Name.String(),
Fields: st.Fields,
}
pkc := make([]int64, len(st.PKColumns))
for i, pk := range st.PKColumns {
pkc[i] = int64(pk)
}
table.PKColumns = pkc
return table
}

// GetConnection returns a connection from the pool
func (se *Engine) GetConnection(ctx context.Context) (*connpool.DBConn, error) {
return se.conns.Get(ctx, nil)
Expand Down
25 changes: 3 additions & 22 deletions go/vt/vttablet/tabletserver/schema/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ import (
"sync"
"time"

"google.golang.org/protobuf/proto"

"vitess.io/vitess/go/vt/schema"

"vitess.io/vitess/go/mysql"
Expand Down Expand Up @@ -240,14 +238,10 @@ func (tr *Tracker) schemaUpdated(gtid string, ddl string, timestamp int64) error
}

func (tr *Tracker) saveCurrentSchemaToDb(ctx context.Context, gtid, ddl string, timestamp int64) error {
tables := tr.engine.GetSchema()
dbSchema := &binlogdatapb.MinimalSchema{
Tables: []*binlogdatapb.MinimalTable{},
}
for _, table := range tables {
dbSchema.Tables = append(dbSchema.Tables, newMinimalTable(table))
blob, err := tr.engine.MarshalMinimalSchema()
if err != nil {
return err
}
blob, _ := proto.Marshal(dbSchema)

conn, err := tr.engine.GetConnection(ctx)
if err != nil {
Expand All @@ -265,19 +259,6 @@ func (tr *Tracker) saveCurrentSchemaToDb(ctx context.Context, gtid, ddl string,
return nil
}

func newMinimalTable(st *Table) *binlogdatapb.MinimalTable {
table := &binlogdatapb.MinimalTable{
Name: st.Name.String(),
Fields: st.Fields,
}
var pkc []int64
for _, pk := range st.PKColumns {
pkc = append(pkc, int64(pk))
}
table.PKColumns = pkc
return table
}

func encodeString(in string) string {
buf := bytes.NewBuffer(nil)
sqltypes.NewVarChar(in).EncodeSQL(buf)
Expand Down
9 changes: 7 additions & 2 deletions go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,14 @@ func (rs *rowStreamer) buildPlan() error {
return err
}
ti := &Table{
Name: st.Name,
Fields: st.Fields,
Name: st.Name,
}

ti.Fields, err = getFields(rs.ctx, rs.cp, st.Name, rs.cp.DBName(), st.Fields)
if err != nil {
return err
}

// The plan we build is identical to the one for vstreamer.
// This is because the row format of a read is identical
// to the row format of a binlog event. So, the same
Expand Down
Loading

0 comments on commit bf09bf7

Please sign in to comment.