diff --git a/go/test/endtoend/vtorc/general/vtorc_test.go b/go/test/endtoend/vtorc/general/vtorc_test.go index d79e2964f3e..cfbb6a69a67 100644 --- a/go/test/endtoend/vtorc/general/vtorc_test.go +++ b/go/test/endtoend/vtorc/general/vtorc_test.go @@ -495,3 +495,78 @@ func TestDurabilityPolicySetLater(t *testing.T) { assert.NotNil(t, primary, "should have elected a primary") utils.CheckReplication(t, newCluster, primary, shard0.Vttablets, 10*time.Second) } + +// TestFullStatusConnectionPooling tests that full status RPC succeeds despite a vttablet restarting with a different +// IP address and then back to its original. This test has been added in response to a bug seen in production with a similar situation occurring. +func TestFullStatusConnectionPooling(t *testing.T) { + defer utils.PrintVTOrcLogsOnFailure(t, clusterInfo.ClusterInstance) + defer cluster.PanicHandler(t) + utils.SetupVttabletsAndVTOrcs(t, clusterInfo, 4, 0, []string{ + "--tablet_manager_grpc_concurrency=1", + }, cluster.VTOrcConfiguration{ + PreventCrossDataCenterPrimaryFailover: true, + }, 1, "") + keyspace := &clusterInfo.ClusterInstance.Keyspaces[0] + shard0 := &keyspace.Shards[0] + vtorc := clusterInfo.ClusterInstance.VTOrcProcesses[0] + + // find primary from topo + curPrimary := utils.ShardPrimaryTablet(t, clusterInfo, keyspace, shard0) + assert.NotNil(t, curPrimary, "should have elected a primary") + vtOrcProcess := clusterInfo.ClusterInstance.VTOrcProcesses[0] + utils.WaitForSuccessfulRecoveryCount(t, vtOrcProcess, logic.ElectNewPrimaryRecoveryName, 1) + utils.WaitForSuccessfulPRSCount(t, vtOrcProcess, keyspace.Name, shard0.Name, 1) + + // Kill the current primary. + _ = curPrimary.VttabletProcess.Kill() + + // Wait until VTOrc notices some problems + status, resp := utils.MakeAPICallRetry(t, vtorc, "/api/replication-analysis", func(_ int, response string) bool { + return response == "null" + }) + assert.Equal(t, 200, status) + assert.Contains(t, resp, "UnreachablePrimary") + + // We have to wait for some time to ensure the gRPC connections from VTOrc to vttablet + // are broken and closed due to keep-alives. Without this timeout the gRPC connections stay open and test passes trivially. + time.Sleep(1 * time.Minute) + + // Change the primaries ports and restart it. + curPrimary.VttabletProcess.Port = clusterInfo.ClusterInstance.GetAndReservePort() + curPrimary.VttabletProcess.GrpcPort = clusterInfo.ClusterInstance.GetAndReservePort() + err := curPrimary.VttabletProcess.Setup() + require.NoError(t, err) + + // See that VTOrc eventually reports no errors. + // Wait until there are no problems and the api endpoint returns null + status, resp = utils.MakeAPICallRetry(t, vtorc, "/api/replication-analysis", func(_ int, response string) bool { + return response != "null" + }) + assert.Equal(t, 200, status) + assert.Equal(t, "null", resp) + + // REPEATED + // Kill the current primary. + _ = curPrimary.VttabletProcess.Kill() + + // Wait until VTOrc notices some problems + status, resp = utils.MakeAPICallRetry(t, vtorc, "/api/replication-analysis", func(_ int, response string) bool { + return response == "null" + }) + assert.Equal(t, 200, status) + assert.Contains(t, resp, "UnreachablePrimary") + + // Change the primaries ports back to original and restart it. + curPrimary.VttabletProcess.Port = curPrimary.HTTPPort + curPrimary.VttabletProcess.GrpcPort = curPrimary.GrpcPort + err = curPrimary.VttabletProcess.Setup() + require.NoError(t, err) + + // See that VTOrc eventually reports no errors. + // Wait until there are no problems and the api endpoint returns null + status, resp = utils.MakeAPICallRetry(t, vtorc, "/api/replication-analysis", func(_ int, response string) bool { + return response != "null" + }) + assert.Equal(t, 200, status) + assert.Equal(t, "null", resp) +} diff --git a/go/test/endtoend/vtorc/utils/utils.go b/go/test/endtoend/vtorc/utils/utils.go index dca2c7b1e26..00f75740338 100644 --- a/go/test/endtoend/vtorc/utils/utils.go +++ b/go/test/endtoend/vtorc/utils/utils.go @@ -733,7 +733,7 @@ func MakeAPICall(t *testing.T, vtorc *cluster.VTOrcProcess, url string) (status // The function provided takes in the status and response and returns if we should continue to retry or not func MakeAPICallRetry(t *testing.T, vtorc *cluster.VTOrcProcess, url string, retry func(int, string) bool) (status int, response string) { t.Helper() - timeout := time.After(10 * time.Second) + timeout := time.After(30 * time.Second) for { select { case <-timeout: diff --git a/go/vt/vttablet/grpctmclient/client.go b/go/vt/vttablet/grpctmclient/client.go index d8ae032bd74..4c795cc70a9 100644 --- a/go/vt/vttablet/grpctmclient/client.go +++ b/go/vt/vttablet/grpctmclient/client.go @@ -108,7 +108,7 @@ type dialer interface { } type poolDialer interface { - dialPool(ctx context.Context, tablet *topodatapb.Tablet) (tabletmanagerservicepb.TabletManagerClient, error) + dialPool(ctx context.Context, tablet *topodatapb.Tablet) (c tabletmanagerservicepb.TabletManagerClient, invalidator func(error), err error) } // Client implements tmclient.TabletManagerClient. @@ -152,11 +152,11 @@ func (client *grpcClient) dial(ctx context.Context, tablet *topodatapb.Tablet) ( return tabletmanagerservicepb.NewTabletManagerClient(cc), cc, nil } -func (client *grpcClient) dialPool(ctx context.Context, tablet *topodatapb.Tablet) (tabletmanagerservicepb.TabletManagerClient, error) { +func (client *grpcClient) dialPool(ctx context.Context, tablet *topodatapb.Tablet) (tabletmanagerservicepb.TabletManagerClient, func(error) /* map invalidator */, error) { addr := netutil.JoinHostPort(tablet.Hostname, int32(tablet.PortMap["grpc"])) opt, err := grpcclient.SecureDialOption(cert, key, ca, crl, name) if err != nil { - return nil, err + return nil, nil, err } client.mu.Lock() @@ -172,7 +172,7 @@ func (client *grpcClient) dialPool(ctx context.Context, tablet *topodatapb.Table for i := 0; i < cap(c); i++ { cc, err := grpcclient.Dial(addr, grpcclient.FailFast(false), opt) if err != nil { - return nil, err + return nil, nil, err } c <- &tmc{ cc: cc, @@ -185,7 +185,20 @@ func (client *grpcClient) dialPool(ctx context.Context, tablet *topodatapb.Table result := <-c c <- result - return result.client, nil + invalidator := func(err error) { + if err == nil { + return + } + // A bit aggressively, we close connections and delete the client from cache + // upon any error. This is specifically to solve situation where gRPC communication + // is broken, but at this time we don't have a good way to distinguish between + // gRPC errors and other, "normal" errors. + client.mu.Lock() + defer client.mu.Unlock() + result.cc.Close() + delete(client.rpcClientMap, addr) + } + return result.client, invalidator, nil } // Close is part of the tmclient.TabletManagerClient interface. @@ -472,7 +485,7 @@ func (client *Client) ExecuteFetchAsDba(ctx context.Context, tablet *topodatapb. var err error if usePool { if poolDialer, ok := client.dialer.(poolDialer); ok { - c, err = poolDialer.dialPool(ctx, tablet) + c, _, err = poolDialer.dialPool(ctx, tablet) if err != nil { return nil, err } @@ -508,7 +521,7 @@ func (client *Client) ExecuteMultiFetchAsDba(ctx context.Context, tablet *topoda var err error if usePool { if poolDialer, ok := client.dialer.(poolDialer); ok { - c, err = poolDialer.dialPool(ctx, tablet) + c, _, err = poolDialer.dialPool(ctx, tablet) if err != nil { return nil, err } @@ -564,7 +577,7 @@ func (client *Client) ExecuteFetchAsApp(ctx context.Context, tablet *topodatapb. var err error if usePool { if poolDialer, ok := client.dialer.(poolDialer); ok { - c, err = poolDialer.dialPool(ctx, tablet) + c, _, err = poolDialer.dialPool(ctx, tablet) if err != nil { return nil, err } @@ -612,8 +625,9 @@ func (client *Client) ReplicationStatus(ctx context.Context, tablet *topodatapb. func (client *Client) FullStatus(ctx context.Context, tablet *topodatapb.Tablet) (*replicationdatapb.FullStatus, error) { var c tabletmanagerservicepb.TabletManagerClient var err error + var invalidator func(error) if poolDialer, ok := client.dialer.(poolDialer); ok { - c, err = poolDialer.dialPool(ctx, tablet) + c, invalidator, err = poolDialer.dialPool(ctx, tablet) if err != nil { return nil, err } @@ -630,6 +644,9 @@ func (client *Client) FullStatus(ctx context.Context, tablet *topodatapb.Tablet) response, err := c.FullStatus(ctx, &tabletmanagerdatapb.FullStatusRequest{}) if err != nil { + if invalidator != nil { + invalidator(err) + } return nil, err } return response.Status, nil @@ -1102,8 +1119,9 @@ func (client *Client) Backup(ctx context.Context, tablet *topodatapb.Tablet, req func (client *Client) CheckThrottler(ctx context.Context, tablet *topodatapb.Tablet, req *tabletmanagerdatapb.CheckThrottlerRequest) (*tabletmanagerdatapb.CheckThrottlerResponse, error) { var c tabletmanagerservicepb.TabletManagerClient var err error + var invalidator func(error) if poolDialer, ok := client.dialer.(poolDialer); ok { - c, err = poolDialer.dialPool(ctx, tablet) + c, invalidator, err = poolDialer.dialPool(ctx, tablet) if err != nil { return nil, err } @@ -1120,6 +1138,9 @@ func (client *Client) CheckThrottler(ctx context.Context, tablet *topodatapb.Tab response, err := c.CheckThrottler(ctx, req) if err != nil { + if invalidator != nil { + invalidator(err) + } return nil, err } return response, nil