diff --git a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go index 2b770c1d4f4..854157b1546 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go @@ -88,7 +88,7 @@ type uvstreamer struct { config *uvstreamerConfig - vs *vstreamer //last vstreamer created in uvstreamer + vs *vstreamer // last vstreamer created in uvstreamer } type uvstreamerConfig struct { @@ -138,6 +138,9 @@ func (uvs *uvstreamer) buildTablePlan() error { uvs.plans = make(map[string]*tablePlan) tableLastPKs := make(map[string]*binlogdatapb.TableLastPK) for _, tablePK := range uvs.inTablePKs { + if tablePK != nil && tablePK.Lastpk != nil && len(tablePK.Lastpk.Fields) == 0 { + return fmt.Errorf("lastpk for table %s has no fields defined", tablePK.TableName) + } tableLastPKs[tablePK.TableName] = tablePK } tables := uvs.se.GetSchema() @@ -313,7 +316,6 @@ func (uvs *uvstreamer) send2(evs []*binlogdatapb.VEvent) error { } behind := time.Now().UnixNano() - uvs.lastTimestampNs uvs.setReplicationLagSeconds(behind / 1e9) - //log.Infof("sbm set to %d", uvs.ReplicationLagSeconds) var evs2 []*binlogdatapb.VEvent if len(uvs.plans) > 0 { evs2 = uvs.filterEvents(evs) diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_flaky_test.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_flaky_test.go index 0eda0d6c52e..0fb9a841a7c 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_flaky_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_flaky_test.go @@ -83,6 +83,42 @@ func (tfe *TestFieldEvent) String() string { return s } +// TestVStreamMissingFieldsInLastPK tests that we error out if the lastpk for a table is missing the fields spec. +func TestVStreamMissingFieldsInLastPK(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + oldEngine := engine + engine = nil + oldEnv := env + env = nil + newEngine(t, ctx, "noblob") + defer func() { + engine = oldEngine + env = oldEnv + }() + execStatements(t, []string{ + "create table t1(id int, blb blob, val varchar(4), primary key(id))", + }) + defer execStatements(t, []string{ + "drop table t1", + }) + engine.se.Reload(context.Background()) + var tablePKs []*binlogdatapb.TableLastPK + tablePKs = append(tablePKs, getTablePK("t1", 1)) + for _, tpk := range tablePKs { + tpk.Lastpk.Fields = nil + } + filter := &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "t1", + Filter: "select * from t1", + }}, + } + ch := make(chan []*binlogdatapb.VEvent) + err := vstream(ctx, t, "", tablePKs, filter, ch) + require.ErrorContains(t, err, "lastpk for table t1 has no fields defined") +} + // TestPlayerNoBlob sets up a new environment with mysql running with binlog_row_image as noblob. It confirms that // the VEvents created are correct: that they don't contain the missing columns and that the DataColumns bitmap is sent func TestNoBlob(t *testing.T) {