From 2788487b2987aa99c45283654d4f40ddea93d9e1 Mon Sep 17 00:00:00 2001 From: "vitess-bot[bot]" <108069721+vitess-bot[bot]@users.noreply.github.com> Date: Tue, 27 Feb 2024 07:46:55 -0500 Subject: [PATCH] Cherry-pick aba0d83c8ae23c493fe2af76b39d68e872c3ab86 with conflicts --- go/vt/topo/memorytopo/election.go | 12 +++++++++--- go/vt/topo/memorytopo/lock.go | 2 +- go/vt/topo/memorytopo/memorytopo.go | 13 +++++++++++-- go/vt/topo/memorytopo/watch.go | 12 ++++++++++++ 4 files changed, 33 insertions(+), 6 deletions(-) diff --git a/go/vt/topo/memorytopo/election.go b/go/vt/topo/memorytopo/election.go index 868a2c53287..81a8604e896 100644 --- a/go/vt/topo/memorytopo/election.go +++ b/go/vt/topo/memorytopo/election.go @@ -26,7 +26,13 @@ import ( // NewLeaderParticipation is part of the topo.Server interface func (c *Conn) NewLeaderParticipation(name, id string) (topo.LeaderParticipation, error) { +<<<<<<< HEAD if c.closed { +======= + c.factory.callstats.Add([]string{"NewLeaderParticipation"}, 1) + + if c.closed.Load() { +>>>>>>> aba0d83c8a (CI: Address data races on memorytopo Conn.closed (#15365)) return nil, ErrConnectionClosed } @@ -72,7 +78,7 @@ type cLeaderParticipation struct { // WaitForLeadership is part of the topo.LeaderParticipation interface. func (mp *cLeaderParticipation) WaitForLeadership() (context.Context, error) { - if mp.c.closed { + if mp.c.closed.Load() { return nil, ErrConnectionClosed } @@ -120,7 +126,7 @@ func (mp *cLeaderParticipation) Stop() { // GetCurrentLeaderID is part of the topo.LeaderParticipation interface func (mp *cLeaderParticipation) GetCurrentLeaderID(ctx context.Context) (string, error) { - if mp.c.closed { + if mp.c.closed.Load() { return "", ErrConnectionClosed } @@ -139,7 +145,7 @@ func (mp *cLeaderParticipation) GetCurrentLeaderID(ctx context.Context) (string, // WaitForNewLeader is part of the topo.LeaderParticipation interface func (mp *cLeaderParticipation) WaitForNewLeader(ctx context.Context) (<-chan string, error) { - if mp.c.closed { + if mp.c.closed.Load() { return nil, ErrConnectionClosed } diff --git a/go/vt/topo/memorytopo/lock.go b/go/vt/topo/memorytopo/lock.go index c15fb9099bb..5c2a2462495 100644 --- a/go/vt/topo/memorytopo/lock.go +++ b/go/vt/topo/memorytopo/lock.go @@ -112,7 +112,7 @@ func (ld *memoryTopoLockDescriptor) Unlock(ctx context.Context) error { } func (c *Conn) unlock(ctx context.Context, dirPath string) error { - if c.closed { + if c.closed.Load() { return ErrConnectionClosed } diff --git a/go/vt/topo/memorytopo/memorytopo.go b/go/vt/topo/memorytopo/memorytopo.go index 504f1d4bd39..c1c63dffc80 100644 --- a/go/vt/topo/memorytopo/memorytopo.go +++ b/go/vt/topo/memorytopo/memorytopo.go @@ -25,7 +25,11 @@ import ( "math/rand" "strings" "sync" +<<<<<<< HEAD "time" +======= + "sync/atomic" +>>>>>>> aba0d83c8a (CI: Address data races on memorytopo Conn.closed (#15365)) "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/topo" @@ -124,14 +128,14 @@ type Conn struct { factory *Factory cell string serverAddr string - closed bool + closed atomic.Bool } // dial returns immediately, unless the Conn points to the sentinel // UnreachableServerAddr, in which case it will block until the context expires // and return the context's error. func (c *Conn) dial(ctx context.Context) error { - if c.closed { + if c.closed.Load() { return ErrConnectionClosed } if c.serverAddr == UnreachableServerAddr { @@ -144,7 +148,12 @@ func (c *Conn) dial(ctx context.Context) error { // Close is part of the topo.Conn interface. func (c *Conn) Close() { +<<<<<<< HEAD c.closed = true +======= + c.factory.callstats.Add([]string{"Close"}, 1) + c.closed.Store(true) +>>>>>>> aba0d83c8a (CI: Address data races on memorytopo Conn.closed (#15365)) } type watch struct { diff --git a/go/vt/topo/memorytopo/watch.go b/go/vt/topo/memorytopo/watch.go index 73b2d248434..0d568134c66 100644 --- a/go/vt/topo/memorytopo/watch.go +++ b/go/vt/topo/memorytopo/watch.go @@ -25,7 +25,13 @@ import ( // Watch is part of the topo.Conn interface. func (c *Conn) Watch(ctx context.Context, filePath string) (*topo.WatchData, <-chan *topo.WatchData, error) { +<<<<<<< HEAD if c.closed { +======= + c.factory.callstats.Add([]string{"Watch"}, 1) + + if c.closed.Load() { +>>>>>>> aba0d83c8a (CI: Address data races on memorytopo Conn.closed (#15365)) return nil, nil, ErrConnectionClosed } @@ -75,7 +81,13 @@ func (c *Conn) Watch(ctx context.Context, filePath string) (*topo.WatchData, <-c // WatchRecursive is part of the topo.Conn interface. func (c *Conn) WatchRecursive(ctx context.Context, dirpath string) ([]*topo.WatchDataRecursive, <-chan *topo.WatchDataRecursive, error) { +<<<<<<< HEAD if c.closed { +======= + c.factory.callstats.Add([]string{"WatchRecursive"}, 1) + + if c.closed.Load() { +>>>>>>> aba0d83c8a (CI: Address data races on memorytopo Conn.closed (#15365)) return nil, nil, ErrConnectionClosed }