diff --git a/go/test/endtoend/tabletgateway/vtgate_test.go b/go/test/endtoend/tabletgateway/vtgate_test.go index de4546d5d0d..a48a22f2cb0 100644 --- a/go/test/endtoend/tabletgateway/vtgate_test.go +++ b/go/test/endtoend/tabletgateway/vtgate_test.go @@ -82,13 +82,14 @@ func TestVtgateReplicationStatusCheck(t *testing.T) { // Stop replication on the non-PRIMARY tablets. _, err = clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("ExecuteFetchAsDBA", clusterInstance.Keyspaces[0].Shards[0].Replica().Alias, "stop slave") require.NoError(t, err) - _, err = clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("ExecuteFetchAsDBA", clusterInstance.Keyspaces[0].Shards[0].Rdonly().Alias, "stop slave") + _, err = clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("ExecuteMultiFetchAsDBA", clusterInstance.Keyspaces[0].Shards[0].Rdonly().Alias, "stop slave") require.NoError(t, err) // Restart replication afterward as the cluster is re-used. defer func() { _, err = clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("ExecuteFetchAsDBA", clusterInstance.Keyspaces[0].Shards[0].Replica().Alias, "start slave") require.NoError(t, err) - _, err = clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("ExecuteFetchAsDBA", clusterInstance.Keyspaces[0].Shards[0].Rdonly().Alias, "start slave") + // Testing ExecuteMultiFetchAsDBA by running multiple commands in a single call: + _, err = clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("ExecuteMultiFetchAsDBA", clusterInstance.Keyspaces[0].Shards[0].Rdonly().Alias, "start slave sql_thread; start slave io_thread;") require.NoError(t, err) }() time.Sleep(2 * time.Second) // Build up some replication lag diff --git a/go/test/endtoend/tabletmanager/commands_test.go b/go/test/endtoend/tabletmanager/commands_test.go index ca0b3b15818..f4d74f1e205 100644 --- a/go/test/endtoend/tabletmanager/commands_test.go +++ b/go/test/endtoend/tabletmanager/commands_test.go @@ -60,12 +60,21 @@ func TestTabletCommands(t *testing.T) { utils.Exec(t, conn, "insert into t1(id, value) values(1,'a'), (2,'b')") checkDataOnReplica(t, replicaConn, `[[VARCHAR("a")] [VARCHAR("b")]]`) - // make sure direct dba queries work - sql := "select * from t1" - result, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("ExecuteFetchAsDBA", "--json", primaryTablet.Alias, sql) - require.Nil(t, err) - assertExecuteFetch(t, result) + t.Run("ExecuteFetchAsDBA", func(t *testing.T) { + // make sure direct dba queries work + sql := "select * from t1" + result, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("ExecuteFetchAsDBA", "--json", primaryTablet.Alias, sql) + require.Nil(t, err) + assertExecuteFetch(t, result) + }) + t.Run("ExecuteMultiFetchAsDBA", func(t *testing.T) { + // make sure direct dba queries work + sql := "select * from t1; select * from t1 limit 100" + result, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("ExecuteMultiFetchAsDBA", "--json", primaryTablet.Alias, sql) + require.Nil(t, err) + assertExecuteFetch(t, result) + }) // check Ping / RefreshState / RefreshStateByShard err = clusterInstance.VtctldClientProcess.ExecuteCommand("PingTablet", primaryTablet.Alias) require.Nil(t, err, "error should be Nil") @@ -139,6 +148,22 @@ func assertExecuteFetch(t *testing.T, qr string) { want = int(2) assert.Equal(t, want, got) } +func assertExecuteMultiFetch(t *testing.T, qr string) { + resultMap := make([]map[string]any, 0) + err := json.Unmarshal([]byte(qr), &resultMap) + require.Nil(t, err) + require.NotEmpty(t, resultMap) + + rows := reflect.ValueOf(resultMap[0]["rows"]) + got := rows.Len() + want := int(2) + assert.Equal(t, want, got) + + fields := reflect.ValueOf(resultMap[0]["fields"]) + got = fields.Len() + want = int(2) + assert.Equal(t, want, got) +} func TestHook(t *testing.T) { // test a regular program works diff --git a/go/vt/vtctl/grpcvtctldserver/server_test.go b/go/vt/vtctl/grpcvtctldserver/server_test.go index 1a473fdad75..b3546b7a4a4 100644 --- a/go/vt/vtctl/grpcvtctldserver/server_test.go +++ b/go/vt/vtctl/grpcvtctldserver/server_test.go @@ -4849,6 +4849,135 @@ func TestExecuteFetchAsDBA(t *testing.T) { } } +func TestExecuteMultiFetchAsDBA(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + tablet *topodatapb.Tablet + tmc *testutil.TabletManagerClient + req *vtctldatapb.ExecuteMultiFetchAsDBARequest + expected *vtctldatapb.ExecuteMultiFetchAsDBAResponse + shouldErr bool + }{ + { + name: "ok", + tablet: &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 100, + }, + }, + tmc: &testutil.TabletManagerClient{ + ExecuteMultiFetchAsDbaResults: map[string]struct { + Response []*querypb.QueryResult + Error error + }{ + "zone1-0000000100": { + Response: []*querypb.QueryResult{ + {InsertId: 100}, + {InsertId: 101}, + }, + }, + }, + }, + req: &vtctldatapb.ExecuteMultiFetchAsDBARequest{ + TabletAlias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 100, + }, + Sql: "select 1; select 2", + }, + expected: &vtctldatapb.ExecuteMultiFetchAsDBAResponse{ + Results: []*querypb.QueryResult{ + {InsertId: 100}, + {InsertId: 101}, + }, + }, + }, + { + name: "tablet not found", + tablet: &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 100, + }, + }, + tmc: &testutil.TabletManagerClient{ + ExecuteMultiFetchAsDbaResults: map[string]struct { + Response []*querypb.QueryResult + Error error + }{ + "zone1-0000000100": { + Response: []*querypb.QueryResult{ + {InsertId: 100}, + {InsertId: 101}, + }, + }, + }, + }, + req: &vtctldatapb.ExecuteMultiFetchAsDBARequest{ + TabletAlias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 404, + }, + Sql: "select 1; select 2;", + }, + shouldErr: true, + }, + { + name: "query error", + tablet: &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 100, + }, + }, + tmc: &testutil.TabletManagerClient{ + ExecuteMultiFetchAsDbaResults: map[string]struct { + Response []*querypb.QueryResult + Error error + }{ + "zone1-0000000100": { + Error: assert.AnError, + }, + }, + }, + req: &vtctldatapb.ExecuteMultiFetchAsDBARequest{ + TabletAlias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 100, + }, + Sql: "select 1; select 2", + }, + shouldErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + ts := memorytopo.NewServer(ctx, "zone1") + testutil.AddTablet(ctx, t, ts, tt.tablet, nil) + + vtctld := testutil.NewVtctldServerWithTabletManagerClient(t, ts, tt.tmc, func(ts *topo.Server) vtctlservicepb.VtctldServer { + return NewVtctldServer(vtenv.NewTestEnv(), ts) + }) + resp, err := vtctld.ExecuteMultiFetchAsDBA(ctx, tt.req) + if tt.shouldErr { + assert.Error(t, err) + return + } + + require.NoError(t, err) + utils.MustMatch(t, tt.expected, resp) + }) + } +} + func TestExecuteHook(t *testing.T) { t.Parallel() diff --git a/go/vt/vtctl/grpcvtctldserver/testutil/test_tmclient.go b/go/vt/vtctl/grpcvtctldserver/testutil/test_tmclient.go index d7373fc6cc9..1495d18bdf5 100644 --- a/go/vt/vtctl/grpcvtctldserver/testutil/test_tmclient.go +++ b/go/vt/vtctl/grpcvtctldserver/testutil/test_tmclient.go @@ -208,6 +208,13 @@ type TabletManagerClient struct { Error error } // keyed by tablet alias. + ExecuteMultiFetchAsDbaDelays map[string]time.Duration + // keyed by tablet alias. + ExecuteMultiFetchAsDbaResults map[string]struct { + Response []*querypb.QueryResult + Error error + } + // keyed by tablet alias. ExecuteHookDelays map[string]time.Duration // keyed by tablet alias. ExecuteHookResults map[string]struct { @@ -551,15 +558,15 @@ func (fake *TabletManagerClient) ExecuteFetchAsDba(ctx context.Context, tablet * return nil, fmt.Errorf("%w: no ExecuteFetchAsDba result set for tablet %s", assert.AnError, key) } -// ExecuteFetchAsDba is part of the tmclient.TabletManagerClient interface. +// ExecuteMultiFetchAsDba is part of the tmclient.TabletManagerClient interface. func (fake *TabletManagerClient) ExecuteMultiFetchAsDba(ctx context.Context, tablet *topodatapb.Tablet, usePool bool, req *tabletmanagerdatapb.ExecuteMultiFetchAsDbaRequest) ([]*querypb.QueryResult, error) { - if fake.ExecuteFetchAsDbaResults == nil { + if fake.ExecuteMultiFetchAsDbaResults == nil { return nil, fmt.Errorf("%w: no ExecuteMultiFetchAsDba results on fake TabletManagerClient", assert.AnError) } key := topoproto.TabletAliasString(tablet.Alias) - if fake.ExecuteFetchAsDbaDelays != nil { - if delay, ok := fake.ExecuteFetchAsDbaDelays[key]; ok { + if fake.ExecuteMultiFetchAsDbaDelays != nil { + if delay, ok := fake.ExecuteMultiFetchAsDbaDelays[key]; ok { select { case <-ctx.Done(): return nil, ctx.Err() @@ -568,8 +575,8 @@ func (fake *TabletManagerClient) ExecuteMultiFetchAsDba(ctx context.Context, tab } } } - if result, ok := fake.ExecuteFetchAsDbaResults[key]; ok { - return []*querypb.QueryResult{result.Response}, result.Error + if result, ok := fake.ExecuteMultiFetchAsDbaResults[key]; ok { + return result.Response, result.Error } return nil, fmt.Errorf("%w: no ExecuteMultiFetchAsDba result set for tablet %s", assert.AnError, key)