Skip to content

Commit

Permalink
Cherry-pick aba0d83 with conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
vitess-bot[bot] committed Feb 27, 2024
1 parent 792e16c commit 2788487
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 6 deletions.
12 changes: 9 additions & 3 deletions go/vt/topo/memorytopo/election.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,13 @@ import (

// NewLeaderParticipation is part of the topo.Server interface
func (c *Conn) NewLeaderParticipation(name, id string) (topo.LeaderParticipation, error) {
<<<<<<< HEAD
if c.closed {
=======
c.factory.callstats.Add([]string{"NewLeaderParticipation"}, 1)

if c.closed.Load() {
>>>>>>> aba0d83c8a (CI: Address data races on memorytopo Conn.closed (#15365))
return nil, ErrConnectionClosed
}

Expand Down Expand Up @@ -72,7 +78,7 @@ type cLeaderParticipation struct {

// WaitForLeadership is part of the topo.LeaderParticipation interface.
func (mp *cLeaderParticipation) WaitForLeadership() (context.Context, error) {
if mp.c.closed {
if mp.c.closed.Load() {
return nil, ErrConnectionClosed
}

Expand Down Expand Up @@ -120,7 +126,7 @@ func (mp *cLeaderParticipation) Stop() {

// GetCurrentLeaderID is part of the topo.LeaderParticipation interface
func (mp *cLeaderParticipation) GetCurrentLeaderID(ctx context.Context) (string, error) {
if mp.c.closed {
if mp.c.closed.Load() {
return "", ErrConnectionClosed
}

Expand All @@ -139,7 +145,7 @@ func (mp *cLeaderParticipation) GetCurrentLeaderID(ctx context.Context) (string,

// WaitForNewLeader is part of the topo.LeaderParticipation interface
func (mp *cLeaderParticipation) WaitForNewLeader(ctx context.Context) (<-chan string, error) {
if mp.c.closed {
if mp.c.closed.Load() {
return nil, ErrConnectionClosed
}

Expand Down
2 changes: 1 addition & 1 deletion go/vt/topo/memorytopo/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (ld *memoryTopoLockDescriptor) Unlock(ctx context.Context) error {
}

func (c *Conn) unlock(ctx context.Context, dirPath string) error {
if c.closed {
if c.closed.Load() {
return ErrConnectionClosed
}

Expand Down
13 changes: 11 additions & 2 deletions go/vt/topo/memorytopo/memorytopo.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,11 @@ import (
"math/rand"
"strings"
"sync"
<<<<<<< HEAD
"time"
=======
"sync/atomic"
>>>>>>> aba0d83c8a (CI: Address data races on memorytopo Conn.closed (#15365))

"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/topo"
Expand Down Expand Up @@ -124,14 +128,14 @@ type Conn struct {
factory *Factory
cell string
serverAddr string
closed bool
closed atomic.Bool
}

// dial returns immediately, unless the Conn points to the sentinel
// UnreachableServerAddr, in which case it will block until the context expires
// and return the context's error.
func (c *Conn) dial(ctx context.Context) error {
if c.closed {
if c.closed.Load() {
return ErrConnectionClosed
}
if c.serverAddr == UnreachableServerAddr {
Expand All @@ -144,7 +148,12 @@ func (c *Conn) dial(ctx context.Context) error {

// Close is part of the topo.Conn interface.
func (c *Conn) Close() {
<<<<<<< HEAD
c.closed = true
=======
c.factory.callstats.Add([]string{"Close"}, 1)
c.closed.Store(true)
>>>>>>> aba0d83c8a (CI: Address data races on memorytopo Conn.closed (#15365))
}

type watch struct {
Expand Down
12 changes: 12 additions & 0 deletions go/vt/topo/memorytopo/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,13 @@ import (

// Watch is part of the topo.Conn interface.
func (c *Conn) Watch(ctx context.Context, filePath string) (*topo.WatchData, <-chan *topo.WatchData, error) {
<<<<<<< HEAD
if c.closed {
=======
c.factory.callstats.Add([]string{"Watch"}, 1)

if c.closed.Load() {
>>>>>>> aba0d83c8a (CI: Address data races on memorytopo Conn.closed (#15365))
return nil, nil, ErrConnectionClosed
}

Expand Down Expand Up @@ -75,7 +81,13 @@ func (c *Conn) Watch(ctx context.Context, filePath string) (*topo.WatchData, <-c

// WatchRecursive is part of the topo.Conn interface.
func (c *Conn) WatchRecursive(ctx context.Context, dirpath string) ([]*topo.WatchDataRecursive, <-chan *topo.WatchDataRecursive, error) {
<<<<<<< HEAD
if c.closed {
=======
c.factory.callstats.Add([]string{"WatchRecursive"}, 1)

if c.closed.Load() {
>>>>>>> aba0d83c8a (CI: Address data races on memorytopo Conn.closed (#15365))
return nil, nil, ErrConnectionClosed
}

Expand Down

0 comments on commit 2788487

Please sign in to comment.