From 172a5dd3c60db7766a591737bacba17b2286d9cd Mon Sep 17 00:00:00 2001 From: Maxime Ginters Date: Fri, 25 Oct 2024 10:04:01 -0400 Subject: [PATCH 1/2] Properly close zk.Conn while dereferencing it --- go/vt/topo/zk2topo/zk_conn.go | 13 +++--- go/vt/topo/zk2topo/zk_conn_test.go | 64 ++++++++++++++++++++++++++++++ 2 files changed, 69 insertions(+), 8 deletions(-) create mode 100644 go/vt/topo/zk2topo/zk_conn_test.go diff --git a/go/vt/topo/zk2topo/zk_conn.go b/go/vt/topo/zk2topo/zk_conn.go index a0eec8b4340..60ba00a2bf6 100644 --- a/go/vt/topo/zk2topo/zk_conn.go +++ b/go/vt/topo/zk2topo/zk_conn.go @@ -277,6 +277,8 @@ func (c *ZkConn) withRetry(ctx context.Context, action func(conn *zk.Conn) error c.conn = nil } c.mu.Unlock() + log.Infof("zk conn: got ErrConnectionClosed: closing") + conn.Close() } return } @@ -327,13 +329,9 @@ func (c *ZkConn) maybeAddAuth(ctx context.Context) { // clears out the connection record. func (c *ZkConn) handleSessionEvents(conn *zk.Conn, session <-chan zk.Event) { for event := range session { - closeRequired := false switch event.State { - case zk.StateExpired, zk.StateConnecting: - closeRequired = true - fallthrough - case zk.StateDisconnected: + case zk.StateDisconnected, zk.StateExpired, zk.StateConnecting: c.mu.Lock() if c.conn == conn { // The ZkConn still references this @@ -341,9 +339,8 @@ func (c *ZkConn) handleSessionEvents(conn *zk.Conn, session <-chan zk.Event) { c.conn = nil } c.mu.Unlock() - if closeRequired { - conn.Close() - } + log.Infof("zk conn: got %v: closing", event.State) + conn.Close() log.Infof("zk conn: session for addr %v ended: %v", c.addr, event) return } diff --git a/go/vt/topo/zk2topo/zk_conn_test.go b/go/vt/topo/zk2topo/zk_conn_test.go new file mode 100644 index 00000000000..0912294fd52 --- /dev/null +++ b/go/vt/topo/zk2topo/zk_conn_test.go @@ -0,0 +1,64 @@ +/* +Copyright 2019 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package zk2topo + +import ( + "context" + "testing" + + "github.com/z-division/go-zookeeper/zk" + + "vitess.io/vitess/go/testfiles" + "vitess.io/vitess/go/vt/zkctl" +) + +func TestZkConnClosedOnDisconnect(t *testing.T) { + zkd, serverAddr := zkctl.StartLocalZk(testfiles.GoVtTopoZk2topoZkID, testfiles.GoVtTopoZk2topoPort) + defer zkd.Teardown() + + conn := Connect(serverAddr) + _, _, err := conn.Get(context.Background(), "/") + if err != nil { + t.Fatalf("Get() failed: %v", err) + } + + if !conn.conn.State().IsConnected() { + t.Fatalf("Connection not connected: %v", conn.conn.State()) + } + + oldConn := conn.conn + + // simulate a disconnect + zkd.Shutdown() + zkd.Start() + + // do another get to trigger a new connection + _, _, err = conn.Get(context.Background(), "/") + if err != nil { + t.Fatalf("Get() failed: %v", err) + } + + // Check that old connection is closed + _, _, err = oldConn.Get("/") + if err == nil { + t.Fatalf("Get() should have failed: %v", err) + } + + if oldConn.State() != zk.StateDisconnected { + t.Fatalf("Connection not closed: %v", oldConn.State()) + } +} From d7d64927f8cf0c53c56b070f92c64c1537d1fc41 Mon Sep 17 00:00:00 2001 From: shanth96 Date: Mon, 28 Oct 2024 19:10:28 -0400 Subject: [PATCH 2/2] feedback --- go/vt/topo/zk2topo/zk_conn_test.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/go/vt/topo/zk2topo/zk_conn_test.go b/go/vt/topo/zk2topo/zk_conn_test.go index 0912294fd52..de66f7352ce 100644 --- a/go/vt/topo/zk2topo/zk_conn_test.go +++ b/go/vt/topo/zk2topo/zk_conn_test.go @@ -20,6 +20,7 @@ import ( "context" "testing" + "github.com/stretchr/testify/require" "github.com/z-division/go-zookeeper/zk" "vitess.io/vitess/go/testfiles" @@ -31,6 +32,8 @@ func TestZkConnClosedOnDisconnect(t *testing.T) { defer zkd.Teardown() conn := Connect(serverAddr) + defer conn.Close() + _, _, err := conn.Get(context.Background(), "/") if err != nil { t.Fatalf("Get() failed: %v", err) @@ -42,7 +45,7 @@ func TestZkConnClosedOnDisconnect(t *testing.T) { oldConn := conn.conn - // simulate a disconnect + // force a disconnect zkd.Shutdown() zkd.Start() @@ -54,11 +57,9 @@ func TestZkConnClosedOnDisconnect(t *testing.T) { // Check that old connection is closed _, _, err = oldConn.Get("/") - if err == nil { - t.Fatalf("Get() should have failed: %v", err) - } + require.ErrorContains(t, err, "zookeeper is closing") if oldConn.State() != zk.StateDisconnected { - t.Fatalf("Connection not closed: %v", oldConn.State()) + t.Fatalf("Connection is not in disconnected state: %v", oldConn.State()) } }