Skip to content

Commit

Permalink
Tablet throttler: read and use MySQL host metrics (#16904)
Browse files Browse the repository at this point in the history
Signed-off-by: Shlomi Noach <[email protected]>
  • Loading branch information
shlomi-noach authored Nov 20, 2024
1 parent 2c6e053 commit fb79106
Show file tree
Hide file tree
Showing 25 changed files with 735 additions and 118 deletions.
33 changes: 33 additions & 0 deletions go/osutil/loadavg.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
Copyright 2024 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package osutil

import (
"fmt"
"strconv"
"strings"
)

// parseLoadAvg parses the load average from the content of /proc/loadavg or sysctl output.
// Input such as "1.00 0.99 0.98 1/1 1", "2.83 3.01 3.36"
func parseLoadAvg(content string) (float64, error) {
fields := strings.Fields(content)
if len(fields) == 0 {
return 0, fmt.Errorf("unexpected loadavg content: %s", content)
}
return strconv.ParseFloat(fields[0], 64)
}
40 changes: 40 additions & 0 deletions go/osutil/loadavg_darwin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
//go:build darwin

/*
Copyright 2024 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package osutil

import (
"fmt"
"os/exec"
)

// LoadAvg returns the past 1 minute system load average. This works on linux and darwin systems.
// On other systems, it returns 0 with no error.
func LoadAvg() (float64, error) {
cmd := exec.Command("sysctl", "-n", "vm.loadavg")
// Sample output: `{ 2.83 3.01 3.36 }`
output, err := cmd.CombinedOutput()
if err != nil {
return 0, err
}
if len(output) < 1 {
return 0, fmt.Errorf("unexpected sysctl output: %q", output)
}
output = output[1:] // Remove the leading `{ `
return parseLoadAvg(string(output))
}
33 changes: 33 additions & 0 deletions go/osutil/loadavg_linux.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
//go:build linux

/*
Copyright 2024 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package osutil

import (
"os"
)

// LoadAvg returns the past 1 minute system load average. This works on linux and darwin systems.
// On other systems, it returns 0 with no error.
func LoadAvg() (float64, error) {
content, err := os.ReadFile("/proc/loadavg")
if err != nil {
return 0, err
}
return parseLoadAvg(string(content))
}
25 changes: 25 additions & 0 deletions go/osutil/loadavg_other.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
//go:build !linux && !darwin

/*
Copyright 2024 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package osutil

// LoadAvg returns the past 1 minute system load average. This works on linux and darwin systems.
// On other systems, it returns 0 with no error.
func LoadAvg() (float64, error) {
return 0, nil
}
77 changes: 77 additions & 0 deletions go/osutil/loadavg_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
Copyright 2024 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package osutil

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestLoadAvgValue(t *testing.T) {
tcases := []struct {
input string
loadavg float64
isError bool
}{
{
input: "",
isError: true,
},
{
input: "{}",
isError: true,
},
{
input: "{ x y z }",
isError: true,
},
{
input: "1",
loadavg: 1.0,
},
{
input: "0.00 0.00 0.00 1/1 1",
loadavg: 0.0,
},
{
input: "2.72 2.89 3.17",
loadavg: 2.72,
},
{
input: " 2.72 2.89 3.17",
loadavg: 2.72,
},
}
for _, tcase := range tcases {
t.Run(tcase.input, func(t *testing.T) {
loadavg, err := parseLoadAvg(tcase.input)
if tcase.isError {
assert.Error(t, err)
} else {
assert.NoError(t, err)
assert.Equal(t, tcase.loadavg, loadavg)
}
})
}
}

func TestLoadAvg(t *testing.T) {
loadavg, err := LoadAvg()
assert.NoError(t, err)
assert.GreaterOrEqual(t, loadavg, 0.0)
}
64 changes: 53 additions & 11 deletions go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ func warmUpHeartbeat(t *testing.T) tabletmanagerdatapb.CheckThrottlerResponseCod
}

// waitForThrottleCheckStatus waits for the tablet to return the provided HTTP code in a throttle check
func waitForThrottleCheckStatus(t *testing.T, tablet *cluster.Vttablet, wantCode tabletmanagerdatapb.CheckThrottlerResponseCode) bool {
func waitForThrottleCheckStatus(t *testing.T, tablet *cluster.Vttablet, wantCode tabletmanagerdatapb.CheckThrottlerResponseCode) (*tabletmanagerdatapb.CheckThrottlerResponse, bool) {
_ = warmUpHeartbeat(t)
ctx, cancel := context.WithTimeout(context.Background(), onDemandHeartbeatDuration*4)
defer cancel()
Expand All @@ -229,11 +229,11 @@ func waitForThrottleCheckStatus(t *testing.T, tablet *cluster.Vttablet, wantCode
if wantCode == resp.Check.ResponseCode {
// Wait for any cached check values to be cleared and the new
// status value to be in effect everywhere before returning.
return true
return resp.Check, true
}
select {
case <-ctx.Done():
return assert.EqualValues(t, wantCode, resp.Check.StatusCode, "response: %+v", resp)
return resp.Check, false
case <-ticker.C:
}
}
Expand Down Expand Up @@ -779,16 +779,16 @@ func TestUpdateAppCheckedMetrics(t *testing.T) {
}
waitForThrottleCheckStatus(t, primaryTablet, tabletmanagerdatapb.CheckThrottlerResponseCode_THRESHOLD_EXCEEDED)
})
t.Run("assigning 'loadavg' metrics to 'test' app", func(t *testing.T) {
t.Run("assigning 'threads_running' metrics to 'test' app", func(t *testing.T) {
{
req := &vtctldatapb.UpdateThrottlerConfigRequest{MetricName: "loadavg", Threshold: 7777}
req := &vtctldatapb.UpdateThrottlerConfigRequest{MetricName: base.ThreadsRunningMetricName.String(), Threshold: 7777}
_, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, req, nil, nil)
assert.NoError(t, err)
}
{
req := &vtctldatapb.UpdateThrottlerConfigRequest{}
appCheckedMetrics := map[string]*topodatapb.ThrottlerConfig_MetricNames{
testAppName.String(): {Names: []string{"loadavg"}},
testAppName.String(): {Names: []string{base.ThreadsRunningMetricName.String()}},
}
_, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, req, nil, appCheckedMetrics)
assert.NoError(t, err)
Expand All @@ -802,18 +802,18 @@ func TestUpdateAppCheckedMetrics(t *testing.T) {
for _, tablet := range []cluster.Vttablet{*primaryTablet, *replicaTablet} {
throttler.WaitForThrottlerStatusEnabled(t, &clusterInstance.VtctldClientProcess, &tablet, true, &throttler.Config{Query: throttler.DefaultQuery, Threshold: unreasonablyLowThreshold.Seconds()}, throttlerEnabledTimeout)
}
t.Run("validating OK response from throttler since it's checking loadavg", func(t *testing.T) {
if !waitForThrottleCheckStatus(t, primaryTablet, tabletmanagerdatapb.CheckThrottlerResponseCode_OK) {
t.Run("validating OK response from throttler since it's checking threads_running", func(t *testing.T) {
if _, ok := waitForThrottleCheckStatus(t, primaryTablet, tabletmanagerdatapb.CheckThrottlerResponseCode_OK); !ok {
t.Logf("throttler primary status: %+v", throttleStatus(t, primaryTablet))
t.Logf("throttler replica status: %+v", throttleStatus(t, replicaTablet))
}
})
})
t.Run("assigning 'loadavg,lag' metrics to 'test' app", func(t *testing.T) {
t.Run("assigning 'threads_running,lag' metrics to 'test' app", func(t *testing.T) {
{
req := &vtctldatapb.UpdateThrottlerConfigRequest{}
appCheckedMetrics := map[string]*topodatapb.ThrottlerConfig_MetricNames{
testAppName.String(): {Names: []string{"loadavg,lag"}},
testAppName.String(): {Names: []string{base.ThreadsRunningMetricName.String(), base.LagMetricName.String()}},
}
_, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, req, nil, appCheckedMetrics)
assert.NoError(t, err)
Expand All @@ -831,9 +831,51 @@ func TestUpdateAppCheckedMetrics(t *testing.T) {
waitForThrottleCheckStatus(t, primaryTablet, tabletmanagerdatapb.CheckThrottlerResponseCode_THRESHOLD_EXCEEDED)
})
})
t.Run("assigning 'mysqld-loadavg,mysqld-datadir-used-ratio' metrics to 'test' app", func(t *testing.T) {
{
req := &vtctldatapb.UpdateThrottlerConfigRequest{MetricName: base.MysqldDatadirUsedRatioMetricName.String(), Threshold: 0.9999}
_, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, req, nil, nil)
assert.NoError(t, err)
}
{
req := &vtctldatapb.UpdateThrottlerConfigRequest{MetricName: base.MysqldLoadAvgMetricName.String(), Threshold: 5555}
_, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, req, nil, nil)
assert.NoError(t, err)
}
{
req := &vtctldatapb.UpdateThrottlerConfigRequest{}
appCheckedMetrics := map[string]*topodatapb.ThrottlerConfig_MetricNames{
testAppName.String(): {Names: []string{base.MysqldDatadirUsedRatioMetricName.String(), base.MysqldLoadAvgMetricName.String()}},
}
_, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, req, nil, appCheckedMetrics)
assert.NoError(t, err)
}
{
req := &vtctldatapb.UpdateThrottlerConfigRequest{Threshold: extremelyHighThreshold.Seconds()}
_, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, req, nil, nil)
assert.NoError(t, err)
}
// Wait for the throttler to be enabled everywhere with new config.
for _, tablet := range []cluster.Vttablet{*primaryTablet, *replicaTablet} {
throttler.WaitForThrottlerStatusEnabled(t, &clusterInstance.VtctldClientProcess, &tablet, true, &throttler.Config{Query: throttler.DefaultQuery, Threshold: extremelyHighThreshold.Seconds()}, throttlerEnabledTimeout)
}
t.Run("validating OK response from throttler since it's checking mysqld-loadavg,mysqld-datadir-used-ratio", func(t *testing.T) {
resp, ok := waitForThrottleCheckStatus(t, primaryTablet, tabletmanagerdatapb.CheckThrottlerResponseCode_OK)
if !ok {
t.Logf("response: %+v", resp)
t.Logf("throttler primary status: %+v", throttleStatus(t, primaryTablet))
t.Logf("throttler replica status: %+v", throttleStatus(t, replicaTablet))
}
require.Contains(t, resp.Metrics, base.MysqldDatadirUsedRatioMetricName.String())
require.Contains(t, resp.Metrics, base.MysqldLoadAvgMetricName.String())
assert.NotContains(t, resp.Metrics, base.ThreadsRunningMetricName.String())

assert.NotZero(t, resp.Metrics[base.MysqldDatadirUsedRatioMetricName.String()].Value)
})
})
t.Run("removing assignment from 'test' app and restoring defaults", func(t *testing.T) {
{
req := &vtctldatapb.UpdateThrottlerConfigRequest{MetricName: "loadavg", Threshold: 0}
req := &vtctldatapb.UpdateThrottlerConfigRequest{MetricName: base.ThreadsRunningMetricName.String(), Threshold: 0}
_, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, req, nil, nil)
assert.NoError(t, err)
}
Expand Down
8 changes: 5 additions & 3 deletions go/test/endtoend/throttler/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,7 @@ func WaitForValidData(t *testing.T, tablet *cluster.Vttablet, timeout time.Durat
selfCheckURL := fmt.Sprintf("http://localhost:%d/throttler/check-self", tablet.HTTPPort)
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
ticker := time.NewTicker(500 * time.Millisecond)
ticker := time.NewTicker(time.Second)
defer ticker.Stop()

for {
Expand All @@ -548,8 +548,10 @@ func WaitForValidData(t *testing.T, tablet *cluster.Vttablet, timeout time.Durat
}
select {
case <-ctx.Done():
t.Errorf("timed out waiting for %s tablet's throttler to return a valid result after %v; last seen value: %+v",
tablet.Alias, timeout, checkResp)
respByte, _ := io.ReadAll(checkResp.Body)
body := string(respByte)
require.Failf(t, "time out", "waiting for %s tablet's throttler to return a valid result after %v; last seen result: %+v",
tablet.Alias, timeout, body)
return
case <-ticker.C:
}
Expand Down
13 changes: 13 additions & 0 deletions go/textutil/strings.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,19 @@ func SingleWordCamel(w string) string {
return strings.ToUpper(w[0:1]) + strings.ToLower(w[1:])
}

var multiWordSplitterRegexp = regexp.MustCompile(`[-_.\s]+`)

// PascalCase turns a string into PascalCase by splitting it into words and
// capitalizing the first letter of each word.
func PascalCase(w string) string {
var b strings.Builder
words := multiWordSplitterRegexp.Split(w, -1)
for _, word := range words {
b.WriteString(SingleWordCamel(word))
}
return b.String()
}

// ValueIsSimulatedNull returns true if the slice value represents
// a NULL or unknown/unspecified value. This is used to distinguish
// between a zero value empty slice and a user provided value of an
Expand Down
Loading

0 comments on commit fb79106

Please sign in to comment.