Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

throttler (v22): towards formal gRPC calls in endtoend tests, removing HTTP API calls #16530

Draft
wants to merge 12 commits into
base: main
Choose a base branch
from
Draft
67 changes: 5 additions & 62 deletions go/test/endtoend/onlineddl/flow/onlineddl_flow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,7 @@ import (
"context"
"flag"
"fmt"
"io"
"math/rand/v2"
"net/http"
"os"
"path"
"runtime"
Expand All @@ -62,6 +60,7 @@ import (
"vitess.io/vitess/go/test/endtoend/onlineddl"
"vitess.io/vitess/go/test/endtoend/throttler"
"vitess.io/vitess/go/vt/log"
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
"vitess.io/vitess/go/vt/schema"
throttlebase "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/base"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp"
Expand Down Expand Up @@ -240,9 +239,9 @@ func TestOnlineDDLFlow(t *testing.T) {
ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()
for {
_, statusCode, err := throttlerCheck(primaryTablet.VttabletProcess, throttlerapp.OnlineDDLName)
resp, err := throttler.CheckThrottler(&clusterInstance.VtctldClientProcess, primaryTablet, throttlerapp.OnlineDDLName, nil)
assert.NoError(t, err)
throttleWorkload.Store(statusCode != http.StatusOK)
throttleWorkload.Store(resp.Check.ResponseCode != tabletmanagerdatapb.CheckThrottlerResponseCode_OK)
select {
case <-ticker.C:
case <-workloadCtx.Done():
Expand Down Expand Up @@ -282,7 +281,7 @@ func TestOnlineDDLFlow(t *testing.T) {
onlineddl.CheckThrottledApps(t, &vtParams, throttlerapp.OnlineDDLName, false)
onlineddl.ThrottleAllMigrations(t, &vtParams)
onlineddl.CheckThrottledApps(t, &vtParams, throttlerapp.OnlineDDLName, true)
waitForThrottleCheckStatus(t, throttlerapp.OnlineDDLName, primaryTablet, http.StatusExpectationFailed)
throttler.WaitForCheckThrottlerResult(t, &clusterInstance.VtctldClientProcess, primaryTablet, throttlerapp.OnlineDDLName, nil, tabletmanagerdatapb.CheckThrottlerResponseCode_THRESHOLD_EXCEEDED, migrationWaitTimeout)
})
t.Run("unthrottle online-ddl", func(t *testing.T) {
onlineddl.UnthrottleAllMigrations(t, &vtParams)
Expand All @@ -292,7 +291,7 @@ func TestOnlineDDLFlow(t *testing.T) {

t.Logf("Throttler status: %+v", status)
}
waitForThrottleCheckStatus(t, throttlerapp.OnlineDDLName, primaryTablet, http.StatusOK)
throttler.WaitForCheckThrottlerResult(t, &clusterInstance.VtctldClientProcess, primaryTablet, throttlerapp.OnlineDDLName, nil, tabletmanagerdatapb.CheckThrottlerResponseCode_OK, migrationWaitTimeout)
})
t.Run("apply more DML", func(t *testing.T) {
// Looking to run a substantial amount of DML, giving vreplication
Expand Down Expand Up @@ -606,59 +605,3 @@ func initTable(t *testing.T) {
assert.Greater(t, appliedDMLEnd, appliedDMLStart)
assert.GreaterOrEqual(t, appliedDMLEnd-appliedDMLStart, int64(maxTableRows))
}

func throttleResponse(tablet *cluster.VttabletProcess, path string) (respBody string, err error) {
apiURL := fmt.Sprintf("http://%s:%d/%s", tablet.TabletHostname, tablet.Port, path)
resp, err := httpClient.Get(apiURL)
if err != nil {
return "", err
}
defer resp.Body.Close()
b, err := io.ReadAll(resp.Body)
respBody = string(b)
return respBody, err
}

func throttleApp(tablet *cluster.VttabletProcess, throttlerApp throttlerapp.Name) (string, error) {
return throttleResponse(tablet, fmt.Sprintf("throttler/throttle-app?app=%s&duration=1h", throttlerApp.String()))
}

func unthrottleApp(tablet *cluster.VttabletProcess, throttlerApp throttlerapp.Name) (string, error) {
return throttleResponse(tablet, fmt.Sprintf("throttler/unthrottle-app?app=%s", throttlerApp.String()))
}

func throttlerCheck(tablet *cluster.VttabletProcess, throttlerApp throttlerapp.Name) (respBody string, statusCode int, err error) {
apiURL := fmt.Sprintf("http://%s:%d/throttler/check?app=%s", tablet.TabletHostname, tablet.Port, throttlerApp.String())
resp, err := httpClient.Get(apiURL)
if err != nil {
return "", 0, err
}
defer resp.Body.Close()
statusCode = resp.StatusCode
b, err := io.ReadAll(resp.Body)
respBody = string(b)
return respBody, statusCode, err
}

// waitForThrottleCheckStatus waits for the tablet to return the provided HTTP code in a throttle check
func waitForThrottleCheckStatus(t *testing.T, throttlerApp throttlerapp.Name, tablet *cluster.Vttablet, wantCode int) {
ctx, cancel := context.WithTimeout(context.Background(), migrationWaitTimeout)
defer cancel()
ticker := time.NewTicker(time.Second)
defer ticker.Stop()

for {
respBody, statusCode, err := throttlerCheck(tablet.VttabletProcess, throttlerApp)
require.NoError(t, err)

if wantCode == statusCode {
return
}
select {
case <-ctx.Done():
assert.Equalf(t, wantCode, statusCode, "body: %s", respBody)
return
case <-ticker.C:
}
}
}
6 changes: 3 additions & 3 deletions go/test/endtoend/onlineddl/revert/onlineddl_revert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"flag"
"fmt"
"math/rand/v2"
"net/http"
"os"
"path"
"strings"
Expand All @@ -33,6 +32,7 @@ import (
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/mysql/capabilities"
"vitess.io/vitess/go/vt/log"
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
"vitess.io/vitess/go/vt/schema"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp"

Expand Down Expand Up @@ -209,7 +209,7 @@ func TestRevertSchemaChanges(t *testing.T) {
require.Equal(t, 1, len(shards))

throttler.EnableLagThrottlerAndWaitForStatus(t, clusterInstance)
throttler.WaitForCheckThrottlerResult(t, clusterInstance, primaryTablet, throttlerapp.TestingName, nil, http.StatusOK, time.Minute)
throttler.WaitForCheckThrottlerResult(t, &clusterInstance.VtctldClientProcess, primaryTablet, throttlerapp.TestingName, nil, tabletmanagerdatapb.CheckThrottlerResponseCode_OK, time.Minute)

t.Run("revertible", testRevertible)
t.Run("revert", testRevert)
Expand Down Expand Up @@ -415,7 +415,7 @@ func testRevertible(t *testing.T) {
toStatement := fmt.Sprintf(createTableWrapper, testcase.toSchema)
uuid = testOnlineDDLStatement(t, toStatement, ddlStrategy, "vtgate", tableName, "")
if !onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete) {
resp, err := throttler.CheckThrottler(clusterInstance, primaryTablet, throttlerapp.TestingName, nil)
resp, err := throttler.CheckThrottler(&clusterInstance.VtctldClientProcess, primaryTablet, throttlerapp.TestingName, nil)
assert.NoError(t, err)
fmt.Println("Throttler check response: ", resp)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func waitForMessage(t *testing.T, uuid string, messageSubstring string) {
case <-ticker.C:
case <-ctx.Done():
{
resp, err := throttler.CheckThrottler(clusterInstance, primaryTablet, throttlerapp.TestingName, nil)
resp, err := throttler.CheckThrottler(&clusterInstance.VtctldClientProcess, primaryTablet, throttlerapp.TestingName, nil)
assert.NoError(t, err)
fmt.Println("Throttler check response: ", resp)

Expand Down Expand Up @@ -658,7 +658,7 @@ func testScheduler(t *testing.T) {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
throttler.CheckThrottler(clusterInstance, primaryTablet, throttlerapp.OnlineDDLName, nil)
throttler.CheckThrottler(&clusterInstance.VtctldClientProcess, primaryTablet, throttlerapp.OnlineDDLName, nil)
select {
case <-ticker.C:
case <-ctx.Done():
Expand Down
91 changes: 25 additions & 66 deletions go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"context"
"flag"
"fmt"
"io"
"net/http"
"os"
"sync"
Expand Down Expand Up @@ -89,14 +88,6 @@ var (
}
}
}`

httpClient = base.SetupHTTPClient(time.Second)
throttledAppsAPIPath = "throttler/throttled-apps"
statusAPIPath = "throttler/status"
getResponseBody = func(resp *http.Response) string {
body, _ := io.ReadAll(resp.Body)
return string(body)
}
)

func TestMain(m *testing.M) {
Expand Down Expand Up @@ -162,26 +153,13 @@ func TestMain(m *testing.M) {
os.Exit(exitCode)
}

func throttledApps(tablet *cluster.Vttablet) (resp *http.Response, respBody string, err error) {
resp, err = httpClient.Get(fmt.Sprintf("http://localhost:%d/%s", tablet.HTTPPort, throttledAppsAPIPath))
if err != nil {
return resp, respBody, err
}
b, err := io.ReadAll(resp.Body)
if err != nil {
return resp, respBody, err
}
respBody = string(b)
return resp, respBody, err
}

func vitessThrottleCheck(tablet *cluster.Vttablet, skipRequestHeartbeats bool) (*vtctldatapb.CheckThrottlerResponse, error) {
flags := &throttle.CheckFlags{
Scope: base.ShardScope,
SkipRequestHeartbeats: skipRequestHeartbeats,
MultiMetricsEnabled: true,
}
resp, err := throttler.CheckThrottler(clusterInstance, tablet, throttlerapp.VitessName, flags)
resp, err := throttler.CheckThrottler(&clusterInstance.VtctldClientProcess, tablet, throttlerapp.VitessName, flags)
return resp, err
}

Expand All @@ -191,7 +169,7 @@ func throttleCheck(tablet *cluster.Vttablet, skipRequestHeartbeats bool) (*vtctl
SkipRequestHeartbeats: skipRequestHeartbeats,
MultiMetricsEnabled: true,
}
resp, err := throttler.CheckThrottler(clusterInstance, tablet, testAppName, flags)
resp, err := throttler.CheckThrottler(&clusterInstance.VtctldClientProcess, tablet, testAppName, flags)
return resp, err
}

Expand All @@ -200,18 +178,14 @@ func throttleCheckSelf(tablet *cluster.Vttablet) (*vtctldatapb.CheckThrottlerRes
Scope: base.SelfScope,
MultiMetricsEnabled: true,
}
resp, err := throttler.CheckThrottler(clusterInstance, tablet, testAppName, flags)
resp, err := throttler.CheckThrottler(&clusterInstance.VtctldClientProcess, tablet, testAppName, flags)
return resp, err
}

func throttleStatus(t *testing.T, tablet *cluster.Vttablet) string {
resp, err := httpClient.Get(fmt.Sprintf("http://localhost:%d/%s", tablet.HTTPPort, statusAPIPath))
func throttleStatus(t *testing.T, tablet *cluster.Vttablet) *tabletmanagerdatapb.GetThrottlerStatusResponse {
status, err := throttler.GetThrottlerStatus(&clusterInstance.VtctldClientProcess, tablet)
require.NoError(t, err)
defer resp.Body.Close()

b, err := io.ReadAll(resp.Body)
require.NoError(t, err)
return string(b)
return status
}

func warmUpHeartbeat(t *testing.T) tabletmanagerdatapb.CheckThrottlerResponseCode {
Expand All @@ -221,32 +195,19 @@ func warmUpHeartbeat(t *testing.T) tabletmanagerdatapb.CheckThrottlerResponseCod
require.NoError(t, err)

time.Sleep(time.Second)
t.Logf("resp.StatusCode: %v", resp.Check.StatusCode)
t.Logf("resp.ResponseCode: %v", resp.Check.ResponseCode)
return throttle.ResponseCodeFromStatus(resp.Check.ResponseCode, int(resp.Check.StatusCode))
}

// waitForThrottleCheckStatus waits for the tablet to return the provided HTTP code in a throttle check
func waitForThrottleCheckStatus(t *testing.T, tablet *cluster.Vttablet, wantCode tabletmanagerdatapb.CheckThrottlerResponseCode) (*tabletmanagerdatapb.CheckThrottlerResponse, bool) {
_ = warmUpHeartbeat(t)
ctx, cancel := context.WithTimeout(context.Background(), onDemandHeartbeatDuration*4)
defer cancel()

ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
resp, err := throttleCheck(tablet, true)
require.NoError(t, err)

if wantCode == resp.Check.ResponseCode {
// Wait for any cached check values to be cleared and the new
// status value to be in effect everywhere before returning.
return resp.Check, true
}
select {
case <-ctx.Done():
return resp.Check, false
case <-ticker.C:
}
}
flags := &throttle.CheckFlags{SkipRequestHeartbeats: true}
resp, ok := throttler.WaitForCheckThrottlerResult(t, &clusterInstance.VtctldClientProcess, tablet, throttlerapp.TestingName, flags, wantCode, onDemandHeartbeatDuration*4)
require.NotNil(t, resp)
return resp.Check, ok
}

func vtgateExec(t *testing.T, query string, expectError string) *sqltypes.Result {
Expand Down Expand Up @@ -366,6 +327,7 @@ func TestInitialThrottler(t *testing.T) {
})
t.Run("requesting heartbeats", func(t *testing.T) {
respStatus := warmUpHeartbeat(t)
t.Logf("respStatus: %v", respStatus)
assert.NotEqual(t, tabletmanagerdatapb.CheckThrottlerResponseCode_OK, respStatus)
})
t.Run("validating OK response from throttler with low threshold, heartbeats running", func(t *testing.T) {
Expand All @@ -378,7 +340,7 @@ func TestInitialThrottler(t *testing.T) {
assert.Equal(t, base.ShardScope.String(), metrics.Scope)
}

if !assert.EqualValues(t, http.StatusOK, resp.Check.StatusCode, "Unexpected response from throttler: %+v", resp) {
if !assert.EqualValues(t, tabletmanagerdatapb.CheckThrottlerResponseCode_OK, resp.Check.ResponseCode, "Unexpected response from throttler: %+v", resp) {
rs, err := replicaTablet.VttabletProcess.QueryTablet("show replica status", keyspaceName, false)
assert.NoError(t, err)
t.Logf("Seconds_Behind_Source: %s", rs.Named().Row()["Seconds_Behind_Source"].ToString())
Expand All @@ -403,7 +365,7 @@ func TestInitialThrottler(t *testing.T) {
for _, metrics := range resp.Check.Metrics {
assert.Equal(t, base.ShardScope.String(), metrics.Scope)
}
if !assert.EqualValues(t, http.StatusOK, resp.Check.StatusCode, "Unexpected response from throttler: %+v", resp) {
if !assert.EqualValues(t, tabletmanagerdatapb.CheckThrottlerResponseCode_OK, resp.Check.ResponseCode, "Unexpected response from throttler: %+v", resp) {
rs, err := replicaTablet.VttabletProcess.QueryTablet("show replica status", keyspaceName, false)
assert.NoError(t, err)
t.Logf("Seconds_Behind_Source: %s", rs.Named().Row()["Seconds_Behind_Source"].ToString())
Expand Down Expand Up @@ -476,22 +438,19 @@ func TestThrottlerAfterMetricsCollected(t *testing.T) {
waitForThrottleCheckStatus(t, primaryTablet, tabletmanagerdatapb.CheckThrottlerResponseCode_OK)
})
t.Run("validating throttled apps", func(t *testing.T) {
resp, body, err := throttledApps(primaryTablet)
require.NoError(t, err)
defer resp.Body.Close()
assert.Equalf(t, http.StatusOK, resp.StatusCode, "Unexpected response from throttler: %s", getResponseBody(resp))
assert.Contains(t, body, throttlerapp.TestingAlwaysThrottlerName)
status := throttleStatus(t, primaryTablet)
assert.Contains(t, status.ThrottledApps, throttlerapp.TestingAlwaysThrottlerName.String())
})
t.Run("validating primary check self", func(t *testing.T) {
resp, err := throttleCheckSelf(primaryTablet)
require.NoError(t, err)
assert.EqualValues(t, http.StatusOK, resp.Check.StatusCode, "Unexpected response from throttler: %+v", resp)
assert.EqualValues(t, tabletmanagerdatapb.CheckThrottlerResponseCode_OK, resp.Check.ResponseCode, "Unexpected response from throttler: %+v", resp)
assert.EqualValues(t, tabletmanagerdatapb.CheckThrottlerResponseCode_OK, resp.Check.ResponseCode, "Unexpected response from throttler: %+v", resp)
})
t.Run("validating replica check self", func(t *testing.T) {
resp, err := throttleCheckSelf(replicaTablet)
require.NoError(t, err)
assert.EqualValues(t, http.StatusOK, resp.Check.StatusCode, "Unexpected response from throttler: %+v", resp)
assert.EqualValues(t, tabletmanagerdatapb.CheckThrottlerResponseCode_OK, resp.Check.ResponseCode, "Unexpected response from throttler: %+v", resp)
assert.EqualValues(t, tabletmanagerdatapb.CheckThrottlerResponseCode_OK, resp.Check.ResponseCode, "Unexpected response from throttler: %+v", resp)
})
}
Expand Down Expand Up @@ -530,7 +489,7 @@ func TestLag(t *testing.T) {
assert.Equal(t, base.SelfScope.String(), metrics.Scope)
}
// self (on primary) is unaffected by replication lag
if !assert.EqualValues(t, http.StatusOK, resp.Check.StatusCode, "Unexpected response from throttler: %+v", resp) {
if !assert.EqualValues(t, tabletmanagerdatapb.CheckThrottlerResponseCode_OK, resp.Check.ResponseCode, "Unexpected response from throttler: %+v", resp) {
t.Logf("throttler primary status: %+v", throttleStatus(t, primaryTablet))
t.Logf("throttler replica status: %+v", throttleStatus(t, replicaTablet))
}
Expand Down Expand Up @@ -624,13 +583,13 @@ func TestLag(t *testing.T) {
resp, err := throttleCheckSelf(primaryTablet)
require.NoError(t, err)
// self (on primary) is unaffected by replication lag
assert.EqualValues(t, http.StatusOK, resp.Check.StatusCode, "Unexpected response from throttler: %+v", resp)
assert.EqualValues(t, tabletmanagerdatapb.CheckThrottlerResponseCode_OK, resp.Check.ResponseCode, "Unexpected response from throttler: %+v", resp)
assert.EqualValues(t, tabletmanagerdatapb.CheckThrottlerResponseCode_OK, resp.Check.ResponseCode, "Unexpected response from throttler: %+v", resp)
})
t.Run("replica self-check should be fine", func(t *testing.T) {
resp, err := throttleCheckSelf(replicaTablet)
require.NoError(t, err)
assert.EqualValues(t, http.StatusOK, resp.Check.StatusCode, "Unexpected response from throttler: %+v", resp)
assert.EqualValues(t, tabletmanagerdatapb.CheckThrottlerResponseCode_OK, resp.Check.ResponseCode, "Unexpected response from throttler: %+v", resp)
assert.EqualValues(t, tabletmanagerdatapb.CheckThrottlerResponseCode_OK, resp.Check.ResponseCode, "Unexpected response from throttler: %+v", resp)
})
}
Expand Down Expand Up @@ -675,7 +634,7 @@ func TestCustomQuery(t *testing.T) {
throttler.WaitForValidData(t, primaryTablet, throttlerEnabledTimeout)
resp, err := throttleCheck(primaryTablet, false)
require.NoError(t, err)
assert.EqualValues(t, http.StatusOK, resp.Check.StatusCode, "Unexpected response from throttler: %+v", resp)
assert.EqualValues(t, tabletmanagerdatapb.CheckThrottlerResponseCode_OK, resp.Check.ResponseCode, "Unexpected response from throttler: %+v", resp)
assert.EqualValues(t, tabletmanagerdatapb.CheckThrottlerResponseCode_OK, resp.Check.ResponseCode, "Unexpected response from throttler: %+v", resp)
})
t.Run("test threads running", func(t *testing.T) {
Expand Down Expand Up @@ -714,7 +673,7 @@ func TestCustomQuery(t *testing.T) {
{
resp, err := throttleCheckSelf(primaryTablet)
require.NoError(t, err)
assert.EqualValues(t, http.StatusOK, resp.Check.StatusCode, "Unexpected response from throttler: %+v", resp)
assert.EqualValues(t, tabletmanagerdatapb.CheckThrottlerResponseCode_OK, resp.Check.ResponseCode, "Unexpected response from throttler: %+v", resp)
assert.EqualValues(t, tabletmanagerdatapb.CheckThrottlerResponseCode_OK, resp.Check.ResponseCode, "Unexpected response from throttler: %+v", resp)
}
})
Expand All @@ -738,7 +697,7 @@ func TestRestoreDefaultQuery(t *testing.T) {
t.Run("validating OK response from throttler with default threshold, heartbeats running", func(t *testing.T) {
resp, err := throttleCheck(primaryTablet, false)
require.NoError(t, err)
assert.EqualValues(t, http.StatusOK, resp.Check.StatusCode, "Unexpected response from throttler: %+v", resp)
assert.EqualValues(t, tabletmanagerdatapb.CheckThrottlerResponseCode_OK, resp.Check.ResponseCode, "Unexpected response from throttler: %+v", resp)
assert.EqualValues(t, tabletmanagerdatapb.CheckThrottlerResponseCode_OK, resp.Check.ResponseCode, "Unexpected response from throttler: %+v", resp)
})
t.Run("validating pushback response from throttler on default threshold once heartbeats go stale", func(t *testing.T) {
Expand Down
Loading
Loading