From e7aea12f3f50a5402982b8a119ec308fb055a504 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Mon, 22 Apr 2024 10:42:49 -0400 Subject: [PATCH] Adjust for v17 Signed-off-by: Matt Lord --- go/vt/vtgate/vstream_manager.go | 18 +- go/vt/vtgate/vstream_manager_test.go | 318 +--------------------- go/vt/vttablet/sandboxconn/sandboxconn.go | 17 -- 3 files changed, 12 insertions(+), 341 deletions(-) diff --git a/go/vt/vtgate/vstream_manager.go b/go/vt/vtgate/vstream_manager.go index 326497270bb..b7b5318c149 100644 --- a/go/vt/vtgate/vstream_manager.go +++ b/go/vt/vtgate/vstream_manager.go @@ -25,11 +25,10 @@ import ( "sync" "time" + "google.golang.org/protobuf/proto" + "vitess.io/vitess/go/stats" "vitess.io/vitess/go/vt/discovery" -<<<<<<< HEAD -======= - "vitess.io/vitess/go/vt/key" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/servenv" "vitess.io/vitess/go/vt/srvtopo" @@ -38,21 +37,10 @@ import ( "vitess.io/vitess/go/vt/vterrors" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" ->>>>>>> 495de697cd (VReplication: Take replication lag into account in VStreamManager healthcheck result processing (#15761)) querypb "vitess.io/vitess/go/vt/proto/query" - "vitess.io/vitess/go/vt/servenv" - "vitess.io/vitess/go/vt/topo" - - vtgatepb "vitess.io/vitess/go/vt/proto/vtgate" - - "google.golang.org/protobuf/proto" - - "vitess.io/vitess/go/vt/log" - binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" topodatapb "vitess.io/vitess/go/vt/proto/topodata" + vtgatepb "vitess.io/vitess/go/vt/proto/vtgate" vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" - "vitess.io/vitess/go/vt/srvtopo" - "vitess.io/vitess/go/vt/vterrors" ) // vstreamManager manages vstream requests. diff --git a/go/vt/vtgate/vstream_manager_test.go b/go/vt/vtgate/vstream_manager_test.go index 911ca4a8ac6..259bf862910 100644 --- a/go/vt/vtgate/vstream_manager_test.go +++ b/go/vt/vtgate/vstream_manager_test.go @@ -25,36 +25,26 @@ import ( "testing" "time" - "vitess.io/vitess/go/vt/topo" - - vtgatepb "vitess.io/vitess/go/vt/proto/vtgate" - vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" - "vitess.io/vitess/go/vt/vterrors" - - "vitess.io/vitess/go/stats" - "vitess.io/vitess/go/vt/vttablet/sandboxconn" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "google.golang.org/protobuf/proto" + "vitess.io/vitess/go/stats" "vitess.io/vitess/go/vt/discovery" -<<<<<<< HEAD - "vitess.io/vitess/go/vt/proto/binlogdata" -======= "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/logutil" + "vitess.io/vitess/go/vt/proto/binlogdata" "vitess.io/vitess/go/vt/srvtopo" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vttablet/sandboxconn" ->>>>>>> 495de697cd (VReplication: Take replication lag into account in VStreamManager healthcheck result processing (#15761)) binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" querypb "vitess.io/vitess/go/vt/proto/query" topodatapb "vitess.io/vitess/go/vt/proto/topodata" - "vitess.io/vitess/go/vt/srvtopo" + vtgatepb "vitess.io/vitess/go/vt/proto/vtgate" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) var mu sync.Mutex @@ -1199,297 +1189,9 @@ func TestVStreamIdleHeartbeat(t *testing.T) { } } -<<<<<<< HEAD -func newTestVStreamManager(hc discovery.HealthCheck, serv srvtopo.Server, cell string) *vstreamManager { - gw := NewTabletGateway(context.Background(), hc, serv, cell) -======= -func TestKeyspaceHasBeenSharded(t *testing.T) { - ctx := utils.LeakCheckContext(t) - - cell := "zone1" - ks := "testks" - - type testcase struct { - name string - oldshards []string - newshards []string - vgtid *binlogdatapb.VGtid - trafficSwitched bool - want bool - wantErr string - } - testcases := []testcase{ - { - name: "2 to 4, split both, traffic not switched", - oldshards: []string{ - "-80", - "80-", - }, - newshards: []string{ - "-40", - "40-80", - "80-c0", - "c0-", - }, - vgtid: &binlogdatapb.VGtid{ - ShardGtids: []*binlogdatapb.ShardGtid{ - { - Keyspace: ks, - Shard: "-80", - }, - { - Keyspace: ks, - Shard: "80-", - }, - }, - }, - trafficSwitched: false, - want: false, - }, - { - name: "2 to 4, split both, traffic not switched", - oldshards: []string{ - "-80", - "80-", - }, - newshards: []string{ - "-40", - "40-80", - "80-c0", - "c0-", - }, - vgtid: &binlogdatapb.VGtid{ - ShardGtids: []*binlogdatapb.ShardGtid{ - { - Keyspace: ks, - Shard: "-80", - }, - { - Keyspace: ks, - Shard: "80-", - }, - }, - }, - trafficSwitched: false, - want: false, - }, - { - name: "2 to 8, split both, traffic switched", - oldshards: []string{ - "-80", - "80-", - }, - newshards: []string{ - "-20", - "20-40", - "40-60", - "60-80", - "80-a0", - "a0-c0", - "c0-e0", - "e0-", - }, - vgtid: &binlogdatapb.VGtid{ - ShardGtids: []*binlogdatapb.ShardGtid{ - { - Keyspace: ks, - Shard: "-80", - }, - { - Keyspace: ks, - Shard: "80-", - }, - }, - }, - trafficSwitched: true, - want: true, - }, - { - name: "2 to 4, split only first shard, traffic switched", - oldshards: []string{ - "-80", - "80-", - }, - newshards: []string{ - "-20", - "20-40", - "40-60", - "60-80", - // 80- is not being resharded. - }, - vgtid: &binlogdatapb.VGtid{ - ShardGtids: []*binlogdatapb.ShardGtid{ - { - Keyspace: ks, - Shard: "-80", - }, - { - Keyspace: ks, - Shard: "80-", - }, - }, - }, - trafficSwitched: true, - want: true, - }, - { - name: "4 to 2, merge both shards, traffic switched", - oldshards: []string{ - "-40", - "40-80", - "80-c0", - "c0-", - }, - newshards: []string{ - "-80", - "80-", - }, - vgtid: &binlogdatapb.VGtid{ - ShardGtids: []*binlogdatapb.ShardGtid{ - { - Keyspace: ks, - Shard: "-40", - }, - { - Keyspace: ks, - Shard: "40-80", - }, - { - Keyspace: ks, - Shard: "80-c0", - }, - { - Keyspace: ks, - Shard: "c0-", - }, - }, - }, - trafficSwitched: true, - want: true, - }, - { - name: "4 to 3, merge second half, traffic not switched", - oldshards: []string{ - "-40", - "40-80", - "80-c0", - "c0-", - }, - newshards: []string{ - // -40 and 40-80 are not being resharded. - "80-", // Merge of 80-c0 and c0- - }, - vgtid: &binlogdatapb.VGtid{ - ShardGtids: []*binlogdatapb.ShardGtid{ - { - Keyspace: ks, - Shard: "-40", - }, - { - Keyspace: ks, - Shard: "40-80", - }, - { - Keyspace: ks, - Shard: "80-c0", - }, - { - Keyspace: ks, - Shard: "c0-", - }, - }, - }, - trafficSwitched: false, - want: false, - }, - { - name: "4 to 3, merge second half, traffic switched", - oldshards: []string{ - "-40", - "40-80", - "80-c0", - "c0-", - }, - newshards: []string{ - // -40 and 40-80 are not being resharded. - "80-", // Merge of 80-c0 and c0- - }, - vgtid: &binlogdatapb.VGtid{ - ShardGtids: []*binlogdatapb.ShardGtid{ - { - Keyspace: ks, - Shard: "-40", - }, - { - Keyspace: ks, - Shard: "40-80", - }, - { - Keyspace: ks, - Shard: "80-c0", - }, - { - Keyspace: ks, - Shard: "c0-", - }, - }, - }, - trafficSwitched: true, - want: true, - }, - } - - addTablet := func(t *testing.T, ctx context.Context, host string, port int32, cell, ks, shard string, ts *topo.Server, hc *discovery.FakeHealthCheck, serving bool) { - tabletconn := hc.AddTestTablet(cell, host, port, ks, shard, topodatapb.TabletType_PRIMARY, serving, 0, nil) - err := ts.CreateTablet(ctx, tabletconn.Tablet()) - require.NoError(t, err) - var alias *topodatapb.TabletAlias - if serving { - alias = tabletconn.Tablet().Alias - } - _, err = ts.UpdateShardFields(ctx, ks, shard, func(si *topo.ShardInfo) error { - si.PrimaryAlias = alias - si.IsPrimaryServing = serving - return nil - }) - require.NoError(t, err) - } - - for _, tc := range testcases { - t.Run(tc.name, func(t *testing.T) { - hc := discovery.NewFakeHealthCheck(nil) - _ = createSandbox(ks) - st := getSandboxTopo(ctx, cell, ks, append(tc.oldshards, tc.newshards...)) - vsm := newTestVStreamManager(ctx, hc, st, cell) - vs := vstream{ - vgtid: tc.vgtid, - tabletType: topodatapb.TabletType_PRIMARY, - optCells: cell, - vsm: vsm, - ts: st.topoServer, - } - for i, shard := range tc.oldshards { - addTablet(t, ctx, fmt.Sprintf("1.1.0.%d", i), int32(1000+i), cell, ks, shard, st.topoServer, hc, !tc.trafficSwitched) - } - for i, shard := range tc.newshards { - addTablet(t, ctx, fmt.Sprintf("1.1.1.%d", i), int32(2000+i), cell, ks, shard, st.topoServer, hc, tc.trafficSwitched) - } - got, err := vs.keyspaceHasBeenResharded(ctx, ks) - if tc.wantErr != "" { - require.EqualError(t, err, tc.wantErr) - } else { - require.NoError(t, err) - } - require.Equal(t, tc.want, got) - }) - } -} - // TestVStreamManagerHealthCheckResponseHandling tests the handling of healthcheck responses by // the vstream manager to confirm that we are correctly restarting the vstream when we should. func TestVStreamManagerHealthCheckResponseHandling(t *testing.T) { - ctx := utils.LeakCheckContext(t) - // Capture the vstream warning log. Otherwise we need to re-implement the vstream error // handling in SandboxConn's implementation and then we're not actually testing the // production code. @@ -1503,7 +1205,7 @@ func TestVStreamManagerHealthCheckResponseHandling(t *testing.T) { _ = createSandbox(ks) hc := discovery.NewFakeHealthCheck(nil) st := getSandboxTopo(ctx, cell, ks, []string{shard}) - vsm := newTestVStreamManager(ctx, hc, st, cell) + vsm := newTestVStreamManager(hc, st, cell) vgtid := &binlogdatapb.VGtid{ ShardGtids: []*binlogdatapb.ShardGtid{{ Keyspace: ks, @@ -1512,7 +1214,7 @@ func TestVStreamManagerHealthCheckResponseHandling(t *testing.T) { } source := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, shard, tabletType, true, 0, nil) tabletAlias := topoproto.TabletAliasString(source.Tablet().Alias) - addTabletToSandboxTopo(t, ctx, st, ks, shard, source.Tablet()) + addTabletToSandboxTopo(t, st, ks, shard, source.Tablet()) target := &querypb.Target{ Cell: cell, Keyspace: ks, @@ -1548,8 +1250,7 @@ func TestVStreamManagerHealthCheckResponseHandling(t *testing.T) { Shard: shard, TabletType: topodatapb.TabletType_PRIMARY, }, - PrimaryTermStartTimestamp: time.Now().Unix(), - RealtimeStats: &querypb.RealtimeStats{}, + RealtimeStats: &querypb.RealtimeStats{}, }, wantErr: fmt.Sprintf("tablet %s type has changed from %s to %s", tabletAlias, tabletType, topodatapb.TabletType_PRIMARY.String()), @@ -1606,9 +1307,8 @@ func TestVStreamManagerHealthCheckResponseHandling(t *testing.T) { } } -func newTestVStreamManager(ctx context.Context, hc discovery.HealthCheck, serv srvtopo.Server, cell string) *vstreamManager { - gw := NewTabletGateway(ctx, hc, serv, cell) ->>>>>>> 495de697cd (VReplication: Take replication lag into account in VStreamManager healthcheck result processing (#15761)) +func newTestVStreamManager(hc discovery.HealthCheck, serv srvtopo.Server, cell string) *vstreamManager { + gw := NewTabletGateway(context.Background(), hc, serv, cell) srvResolver := srvtopo.NewResolver(serv, gw, cell) return newVStreamManager(srvResolver, serv, cell) } diff --git a/go/vt/vttablet/sandboxconn/sandboxconn.go b/go/vt/vttablet/sandboxconn/sandboxconn.go index 97a3dbf6f09..d4108d66b08 100644 --- a/go/vt/vttablet/sandboxconn/sandboxconn.go +++ b/go/vt/vttablet/sandboxconn/sandboxconn.go @@ -122,22 +122,11 @@ type SandboxConn struct { NotServing bool -<<<<<<< HEAD getSchemaResult []map[string]string -======= - getSchemaResult []SchemaResult - - parser *sqlparser.Parser streamHealthResponse *querypb.StreamHealthResponse } -type SchemaResult struct { - TablesAndViews map[string]string - UDFs []*querypb.UDFInfo ->>>>>>> 495de697cd (VReplication: Take replication lag into account in VStreamManager healthcheck result processing (#15761)) -} - var _ queryservice.QueryService = (*SandboxConn)(nil) // compile-time interface check // NewSandboxConn returns a new SandboxConn targeted to the provided tablet. @@ -428,11 +417,6 @@ func (sbc *SandboxConn) MessageAck(ctx context.Context, target *querypb.Target, // SandboxSQRowCount is the default number of fake splits returned. var SandboxSQRowCount = int64(10) -<<<<<<< HEAD -// StreamHealth is not implemented. -func (sbc *SandboxConn) StreamHealth(ctx context.Context, callback func(*querypb.StreamHealthResponse) error) error { - return fmt.Errorf("not implemented in test") -======= // SetStreamHealthResponse sets the StreamHealthResponse to be returned in StreamHealth. func (sbc *SandboxConn) SetStreamHealthResponse(res *querypb.StreamHealthResponse) { sbc.mapMu.Lock() @@ -449,7 +433,6 @@ func (sbc *SandboxConn) StreamHealth(ctx context.Context, callback func(*querypb return callback(sbc.streamHealthResponse) } return nil ->>>>>>> 495de697cd (VReplication: Take replication lag into account in VStreamManager healthcheck result processing (#15761)) } // ExpectVStreamStartPos makes the conn verify that that the next vstream request has the right startPos.