Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

interop: improve rpc_soak and channel_soak test to cover concurrency in Go #7926

Merged
merged 45 commits into from
Jan 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
fd83eee
Improve the rpc soak test and channel soak test to cover concurrency …
zbilun Dec 12, 2024
0bbe38b
Remove .idea/ directory from Git tracking
zbilun Dec 12, 2024
219e17e
Fix the style issue
zbilun Dec 12, 2024
ead030a
Remove .idea/ from .gitignore
zbilun Dec 12, 2024
cbddddc
Add .gitignore file
zbilun Dec 12, 2024
be47357
fix channel close issue
zbilun Dec 12, 2024
bfd1b8e
Stop tracking .idea directory
zbilun Dec 12, 2024
3929ecf
Fix the comments style.
zbilun Dec 13, 2024
742ed86
Fix the go style issues.
zbilun Dec 17, 2024
9fdcd56
Fix the go style issues about the space.
zbilun Dec 17, 2024
b718300
Debug for test failure.
zbilun Dec 17, 2024
2df82f8
Replace long parameter lists with struct for cleaner code.
zbilun Dec 17, 2024
2d72173
Clean code by deleting useless comments.
zbilun Dec 17, 2024
a0a3ae2
Fix the test fail
zbilun Dec 17, 2024
957c1b4
Fix the format check
zbilun Dec 17, 2024
67948ce
Fix the p.addr
zbilun Dec 17, 2024
f03b6fe
Change the addrStr back due to the print type issue.
zbilun Dec 17, 2024
f621f31
Clean comments.
zbilun Dec 17, 2024
2204ae8
Add print message.
zbilun Dec 17, 2024
ca86c16
Debug the test fail
zbilun Dec 18, 2024
22c5544
Duplicate print error
zbilun Dec 18, 2024
04282c5
Refactor the doSoakTest func.
zbilun Dec 18, 2024
959c594
Clean comments.
zbilun Dec 18, 2024
895af05
Clean empty line.
zbilun Dec 18, 2024
262b98e
Address the second round comments.
zbilun Dec 29, 2024
1282e49
Fix the format issues.
zbilun Dec 29, 2024
5ef8eca
Fix the naming check.
zbilun Dec 29, 2024
64fc618
Remove .gitignore file from repository
zbilun Dec 29, 2024
85e2438
Refactor the common config.
zbilun Dec 30, 2024
0395c79
Clean comments.
zbilun Dec 30, 2024
8d9fb6b
Clean comments.
zbilun Jan 8, 2025
63e8250
Fix check issues.
zbilun Jan 8, 2025
9c60ca8
Improve latency handling and clean comments.
zbilun Jan 8, 2025
c0d6238
Update latency var name.
zbilun Jan 8, 2025
24a233a
Clean comments.
zbilun Jan 8, 2025
5ef9068
Refactor the big files.
zbilun Jan 9, 2025
f680c8f
Add copyrights.
zbilun Jan 9, 2025
f4cfaf9
Modify copyrights.
zbilun Jan 9, 2025
eacc6e0
Modify imports.
zbilun Jan 9, 2025
ca01d13
Clean comments.
zbilun Jan 9, 2025
8265b0b
Clean comments.
zbilun Jan 9, 2025
76bfa65
Address comments.
zbilun Jan 9, 2025
51b457d
Modify time.After limitaion.
zbilun Jan 9, 2025
3c8b2f4
Revert wrong changes.
zbilun Jan 9, 2025
823ae79
Clean code.
zbilun Jan 9, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 30 additions & 3 deletions interop/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"crypto/tls"
"crypto/x509"
"flag"
"log"
"net"
"os"
"strconv"
Expand Down Expand Up @@ -79,6 +80,7 @@ var (
soakMinTimeMsBetweenRPCs = flag.Int("soak_min_time_ms_between_rpcs", 0, "The minimum time in milliseconds between consecutive RPCs in a soak test (rpc_soak or channel_soak), useful for limiting QPS")
soakRequestSize = flag.Int("soak_request_size", 271828, "The request size in a soak RPC. The default value is set based on the interop large unary test case.")
soakResponseSize = flag.Int("soak_response_size", 314159, "The response size in a soak RPC. The default value is set based on the interop large unary test case.")
soakNumThreads = flag.Int("soak_num_threads", 1, "The number of threads for concurrent execution of the soak tests (rpc_soak or channel_soak). The default value is set based on the interop large unary test case.")
zbilun marked this conversation as resolved.
Show resolved Hide resolved
tlsServerName = flag.String("server_host_override", "", "The server name used to verify the hostname returned by TLS handshake if it is not empty. Otherwise, --server_host is used.")
additionalMetadata = flag.String("additional_metadata", "", "Additional metadata to send in each request, as a semicolon-separated list of key:value pairs.")
testCase = flag.String("test_case", "large_unary",
Expand Down Expand Up @@ -149,6 +151,21 @@ func parseAdditionalMetadataFlag() []string {
return addMd
}

// createSoakTestConfig creates a shared configuration structure for soak tests.
func createBaseSoakConfig(serverAddr string) interop.SoakTestConfig {
return interop.SoakTestConfig{
RequestSize: *soakRequestSize,
ResponseSize: *soakResponseSize,
PerIterationMaxAcceptableLatency: time.Duration(*soakPerIterationMaxAcceptableLatencyMs) * time.Millisecond,
MinTimeBetweenRPCs: time.Duration(*soakMinTimeMsBetweenRPCs) * time.Millisecond,
OverallTimeout: time.Duration(*soakOverallTimeoutSeconds) * time.Second,
ServerAddr: serverAddr,
NumWorkers: *soakNumThreads,
Iterations: *soakIterations,
MaxFailures: *soakMaxFailures,
}
}

func main() {
flag.Parse()
logger.Infof("Client running with test case %q", *testCase)
Expand Down Expand Up @@ -261,7 +278,7 @@ func main() {
}
opts = append(opts, grpc.WithUnaryInterceptor(unaryAddMd), grpc.WithStreamInterceptor(streamingAddMd))
}
conn, err := grpc.Dial(serverAddr, opts...)
conn, err := grpc.NewClient(serverAddr, opts...)
if err != nil {
logger.Fatalf("Fail to dial: %v", err)
}
Expand Down Expand Up @@ -358,10 +375,20 @@ func main() {
interop.DoPickFirstUnary(ctx, tc)
logger.Infoln("PickFirstUnary done")
case "rpc_soak":
interop.DoSoakTest(ctxWithDeadline, tc, serverAddr, opts, false /* resetChannel */, *soakIterations, *soakMaxFailures, *soakRequestSize, *soakResponseSize, time.Duration(*soakPerIterationMaxAcceptableLatencyMs)*time.Millisecond, time.Duration(*soakMinTimeMsBetweenRPCs)*time.Millisecond)
rpcSoakConfig := createBaseSoakConfig(serverAddr)
rpcSoakConfig.ChannelForTest = func() (*grpc.ClientConn, func()) { return conn, func() {} }
interop.DoSoakTest(ctxWithDeadline, rpcSoakConfig)
logger.Infoln("RpcSoak done")
case "channel_soak":
interop.DoSoakTest(ctxWithDeadline, tc, serverAddr, opts, true /* resetChannel */, *soakIterations, *soakMaxFailures, *soakRequestSize, *soakResponseSize, time.Duration(*soakPerIterationMaxAcceptableLatencyMs)*time.Millisecond, time.Duration(*soakMinTimeMsBetweenRPCs)*time.Millisecond)
channelSoakConfig := createBaseSoakConfig(serverAddr)
channelSoakConfig.ChannelForTest = func() (*grpc.ClientConn, func()) {
cc, err := grpc.NewClient(serverAddr, opts...)
if err != nil {
log.Fatalf("Failed to create shared channel: %v", err)
}
return cc, func() { cc.Close() }
}
interop.DoSoakTest(ctxWithDeadline, channelSoakConfig)
logger.Infoln("ChannelSoak done")
case "orca_per_rpc":
interop.DoORCAPerRPCTest(ctx, tc)
Expand Down
193 changes: 193 additions & 0 deletions interop/soak_tests.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
/*
*
* Copyright 2014 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package interop

import (
"bytes"
"context"
"fmt"
"os"
"sync"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/benchmark/stats"
"google.golang.org/grpc/peer"

testgrpc "google.golang.org/grpc/interop/grpc_testing"
testpb "google.golang.org/grpc/interop/grpc_testing"
)

// SoakWorkerResults stores the aggregated results for a specific worker during the soak test.
type SoakWorkerResults struct {
IterationsDone int
Failures int
Latencies *stats.Histogram
}

// SoakIterationConfig holds the parameters required for a single soak iteration.
type SoakIterationConfig struct {
RequestSize int // The size of the request payload in bytes.
ResponseSize int // The expected size of the response payload in bytes.
Client testgrpc.TestServiceClient // The gRPC client to make the call.
CallOptions []grpc.CallOption // Call options for the RPC.
}

// SoakTestConfig holds the configuration for the entire soak test.
type SoakTestConfig struct {
RequestSize int
ResponseSize int
PerIterationMaxAcceptableLatency time.Duration
MinTimeBetweenRPCs time.Duration
OverallTimeout time.Duration
ServerAddr string
NumWorkers int
Iterations int
MaxFailures int
ChannelForTest func() (*grpc.ClientConn, func())
}

func doOneSoakIteration(ctx context.Context, config SoakIterationConfig) (latency time.Duration, err error) {
start := time.Now()
// Do a large-unary RPC.
// Create the request payload.
pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, config.RequestSize)
req := &testpb.SimpleRequest{
ResponseType: testpb.PayloadType_COMPRESSABLE,
ResponseSize: int32(config.ResponseSize),
Payload: pl,
}
// Perform the GRPC call.
var reply *testpb.SimpleResponse
reply, err = config.Client.UnaryCall(ctx, req, config.CallOptions...)
if err != nil {
err = fmt.Errorf("/TestService/UnaryCall RPC failed: %s", err)
return 0, err
}
// Validate response.
t := reply.GetPayload().GetType()
s := len(reply.GetPayload().GetBody())
if t != testpb.PayloadType_COMPRESSABLE || s != config.ResponseSize {
err = fmt.Errorf("got the reply with type %d len %d; want %d, %d", t, s, testpb.PayloadType_COMPRESSABLE, config.ResponseSize)
return 0, err
}
// Calculate latency and return result.
latency = time.Since(start)
return latency, nil
}

func executeSoakTestInWorker(ctx context.Context, config SoakTestConfig, startTime time.Time, workerID int, soakWorkerResults *SoakWorkerResults) {
timeoutDuration := config.OverallTimeout
soakIterationsPerWorker := config.Iterations / config.NumWorkers
for i := 0; i < soakIterationsPerWorker; i++ {
if ctx.Err() != nil {
return
}
if time.Since(startTime) >= timeoutDuration {
fmt.Printf("Test exceeded overall timeout of %v, stopping...\n", config.OverallTimeout)
return
}
earliestNextStart := time.After(config.MinTimeBetweenRPCs)
currentChannel, cleanup := config.ChannelForTest()
defer cleanup()
client := testgrpc.NewTestServiceClient(currentChannel)
var p peer.Peer
iterationConfig := SoakIterationConfig{
RequestSize: config.RequestSize,
ResponseSize: config.ResponseSize,
Client: client,
CallOptions: []grpc.CallOption{grpc.Peer(&p)},
}
latency, err := doOneSoakIteration(ctx, iterationConfig)
if err != nil {
fmt.Fprintf(os.Stderr, "Worker %d: soak iteration: %d elapsed_ms: %d peer: %v server_uri: %s failed: %s\n", workerID, i, 0, p.Addr, config.ServerAddr, err)
soakWorkerResults.Failures++
<-earliestNextStart
continue
}
if latency > config.PerIterationMaxAcceptableLatency {
fmt.Fprintf(os.Stderr, "Worker %d: soak iteration: %d elapsed_ms: %d peer: %v server_uri: %s exceeds max acceptable latency: %d\n", workerID, i, latency, p.Addr, config.ServerAddr, config.PerIterationMaxAcceptableLatency.Milliseconds())
soakWorkerResults.Failures++
<-earliestNextStart
continue
}
// Success: log the details of the iteration.
soakWorkerResults.Latencies.Add(latency.Milliseconds())
soakWorkerResults.IterationsDone++
fmt.Fprintf(os.Stderr, "Worker %d: soak iteration: %d elapsed_ms: %d peer: %v server_uri: %s succeeded\n", workerID, i, latency, p.Addr, config.ServerAddr)
<-earliestNextStart
}
}

// DoSoakTest runs large unary RPCs in a loop for a configurable number of times, with configurable failure thresholds.
// If resetChannel is false, then each RPC will be performed on tc. Otherwise, each RPC will be performed on a new
// stub that is created with the provided server address and dial options.
// TODO(mohanli-ml): Create SoakTestOptions as a parameter for this method.
func DoSoakTest(ctx context.Context, soakConfig SoakTestConfig) {
if soakConfig.Iterations%soakConfig.NumWorkers != 0 {
fmt.Fprintf(os.Stderr, "soakIterations must be evenly divisible by soakNumWThreads\n")
}
startTime := time.Now()
var wg sync.WaitGroup
soakWorkerResults := make([]SoakWorkerResults, soakConfig.NumWorkers)
for i := 0; i < soakConfig.NumWorkers; i++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
executeSoakTestInWorker(ctx, soakConfig, startTime, workerID, &soakWorkerResults[workerID])
}(i)
}
// Wait for all goroutines to complete.
wg.Wait()

//Handle results.
totalIterations := 0
totalFailures := 0
latencies := stats.NewHistogram(stats.HistogramOptions{
NumBuckets: 20,
GrowthFactor: 1,
BaseBucketSize: 1,
MinValue: 0,
})
for _, worker := range soakWorkerResults {
totalIterations += worker.IterationsDone
totalFailures += worker.Failures
if worker.Latencies != nil {
// Add latencies from the worker's Histogram to the main latencies.
latencies.Merge(worker.Latencies)
}
}
var b bytes.Buffer
latencies.Print(&b)
fmt.Fprintf(os.Stderr,
"(server_uri: %s) soak test ran: %d / %d iterations. Total failures: %d. Latencies in milliseconds: %s\n",
soakConfig.ServerAddr, totalIterations, soakConfig.Iterations, totalFailures, b.String())

if totalIterations != soakConfig.Iterations {
fmt.Fprintf(os.Stderr, "Soak test consumed all %v of time and quit early, ran %d out of %d iterations.\n", soakConfig.OverallTimeout, totalIterations, soakConfig.Iterations)
}

if totalFailures > soakConfig.MaxFailures {
fmt.Fprintf(os.Stderr, "Soak test total failures: %d exceeded max failures threshold: %d\n", totalFailures, soakConfig.MaxFailures)
}
if soakConfig.ChannelForTest != nil {
_, cleanup := soakConfig.ChannelForTest()
defer cleanup()
}
}
97 changes: 0 additions & 97 deletions interop/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
package interop

import (
"bytes"
"context"
"fmt"
"io"
Expand All @@ -36,12 +35,10 @@ import (
"golang.org/x/oauth2"
"golang.org/x/oauth2/google"
"google.golang.org/grpc"
"google.golang.org/grpc/benchmark/stats"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/orca"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"

Expand Down Expand Up @@ -684,100 +681,6 @@ func DoPickFirstUnary(ctx context.Context, tc testgrpc.TestServiceClient) {
}
}

func doOneSoakIteration(ctx context.Context, tc testgrpc.TestServiceClient, resetChannel bool, serverAddr string, soakRequestSize int, soakResponseSize int, dopts []grpc.DialOption, copts []grpc.CallOption) (latency time.Duration, err error) {
start := time.Now()
client := tc
if resetChannel {
var conn *grpc.ClientConn
conn, err = grpc.Dial(serverAddr, dopts...)
if err != nil {
return
}
defer conn.Close()
client = testgrpc.NewTestServiceClient(conn)
}
// per test spec, don't include channel shutdown in latency measurement
defer func() { latency = time.Since(start) }()
// do a large-unary RPC
pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, soakRequestSize)
req := &testpb.SimpleRequest{
ResponseType: testpb.PayloadType_COMPRESSABLE,
ResponseSize: int32(soakResponseSize),
Payload: pl,
}
var reply *testpb.SimpleResponse
reply, err = client.UnaryCall(ctx, req, copts...)
if err != nil {
err = fmt.Errorf("/TestService/UnaryCall RPC failed: %s", err)
return
}
t := reply.GetPayload().GetType()
s := len(reply.GetPayload().GetBody())
if t != testpb.PayloadType_COMPRESSABLE || s != soakResponseSize {
err = fmt.Errorf("got the reply with type %d len %d; want %d, %d", t, s, testpb.PayloadType_COMPRESSABLE, soakResponseSize)
return
}
return
}

// DoSoakTest runs large unary RPCs in a loop for a configurable number of times, with configurable failure thresholds.
// If resetChannel is false, then each RPC will be performed on tc. Otherwise, each RPC will be performed on a new
// stub that is created with the provided server address and dial options.
// TODO(mohanli-ml): Create SoakTestOptions as a parameter for this method.
func DoSoakTest(ctx context.Context, tc testgrpc.TestServiceClient, serverAddr string, dopts []grpc.DialOption, resetChannel bool, soakIterations int, maxFailures int, soakRequestSize int, soakResponseSize int, perIterationMaxAcceptableLatency time.Duration, minTimeBetweenRPCs time.Duration) {
start := time.Now()
var elapsedTime float64
iterationsDone := 0
totalFailures := 0
hopts := stats.HistogramOptions{
NumBuckets: 20,
GrowthFactor: 1,
BaseBucketSize: 1,
MinValue: 0,
}
h := stats.NewHistogram(hopts)
for i := 0; i < soakIterations; i++ {
if ctx.Err() != nil {
elapsedTime = time.Since(start).Seconds()
break
}
earliestNextStart := time.After(minTimeBetweenRPCs)
iterationsDone++
var p peer.Peer
latency, err := doOneSoakIteration(ctx, tc, resetChannel, serverAddr, soakRequestSize, soakResponseSize, dopts, []grpc.CallOption{grpc.Peer(&p)})
latencyMs := int64(latency / time.Millisecond)
h.Add(latencyMs)
if err != nil {
totalFailures++
addrStr := "nil"
if p.Addr != nil {
addrStr = p.Addr.String()
}
fmt.Fprintf(os.Stderr, "soak iteration: %d elapsed_ms: %d peer: %s server_uri: %s failed: %s\n", i, latencyMs, addrStr, serverAddr, err)
<-earliestNextStart
continue
}
if latency > perIterationMaxAcceptableLatency {
totalFailures++
fmt.Fprintf(os.Stderr, "soak iteration: %d elapsed_ms: %d peer: %s server_uri: %s exceeds max acceptable latency: %d\n", i, latencyMs, p.Addr.String(), serverAddr, perIterationMaxAcceptableLatency.Milliseconds())
<-earliestNextStart
continue
}
fmt.Fprintf(os.Stderr, "soak iteration: %d elapsed_ms: %d peer: %s server_uri: %s succeeded\n", i, latencyMs, p.Addr.String(), serverAddr)
<-earliestNextStart
}
var b bytes.Buffer
h.Print(&b)
fmt.Fprintf(os.Stderr, "(server_uri: %s) histogram of per-iteration latencies in milliseconds: %s\n", serverAddr, b.String())
fmt.Fprintf(os.Stderr, "(server_uri: %s) soak test ran: %d / %d iterations. total failures: %d. max failures threshold: %d. See breakdown above for which iterations succeeded, failed, and why for more info.\n", serverAddr, iterationsDone, soakIterations, totalFailures, maxFailures)
if iterationsDone < soakIterations {
logger.Fatalf("(server_uri: %s) soak test consumed all %f seconds of time and quit early, only having ran %d out of desired %d iterations.", serverAddr, elapsedTime, iterationsDone, soakIterations)
}
if totalFailures > maxFailures {
logger.Fatalf("(server_uri: %s) soak test total failures: %d exceeds max failures threshold: %d.", serverAddr, totalFailures, maxFailures)
}
}

type testServer struct {
testgrpc.UnimplementedTestServiceServer

Expand Down
Loading
Loading