Skip to content

Commit

Permalink
Hotfix #120 (#136)
Browse files Browse the repository at this point in the history
* cluster: fix deadlock in cluster synchronisation (#120)

For a impressively thorough breakdown of the problem, see:
	#120 (comment)

Huge thanks to @dvic and @KJTsanaktsidis for the report and fix.

* readme: credit @dvic and @KJTsanaktsidis
  • Loading branch information
domodwyer committed Apr 3, 2018
1 parent baa28fc commit f76e4f9
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 1 deletion.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ Further PR's (with tests) are welcome, but please maintain backwards compatibili
* Gracefully recover from a temporarily unreachable server ([details](https://github.com/globalsign/mgo/pull/69))
* Use JSON tags when no explicit BSON are tags set ([details](https://github.com/globalsign/mgo/pull/91))
* Support [$changeStream](https://docs.mongodb.com/manual/changeStreams/) tailing on 3.6+ ([details](https://github.com/globalsign/mgo/pull/97))
* Fix deadlock in cluster synchronisation ([details](https://github.com/globalsign/mgo/issues/120))

---

Expand All @@ -46,11 +47,13 @@ Further PR's (with tests) are welcome, but please maintain backwards compatibili
* @carter2000
* @cezarsa
* @drichelson
* @dvic
* @eaglerayp
* @feliixx
* @fmpwizard
* @idy
* @jameinel
* @KJTsanaktsidis
* @gazoon
* @mapete94
* @peterdeka
Expand Down
2 changes: 1 addition & 1 deletion cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func (cluster *mongoCluster) isMaster(socket *mongoSocket, result *isMasterResul
})
})

err := session.Run(cmd, result)
err := session.runOnSocket(socket, cmd, result)
session.Close()
return err
}
Expand Down
35 changes: 35 additions & 0 deletions cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1964,6 +1964,41 @@ func (s *S) TestConnectCloseConcurrency(c *C) {
wg.Wait()
}

func (s *S) TestNoDeadlockOnClose(c *C) {
if *fast {
// Unfortunately I seem to need quite a high dial timeout to get this to work
// on my machine.
c.Skip("-fast")
}

var shouldStop int32
atomic.StoreInt32(&shouldStop, 0)

listener, err := net.Listen("tcp4", "127.0.0.1:")
c.Check(err, Equals, nil)

go func() {
for atomic.LoadInt32(&shouldStop) == 0 {
sock, err := listener.Accept()
if err != nil {
// Probs just closed
continue
}
sock.Close()
}
}()
defer func() {
atomic.StoreInt32(&shouldStop, 1)
listener.Close()
}()

session, err := mgo.DialWithTimeout(listener.Addr().String(), 10*time.Second)
// If execution reaches here, the deadlock did not happen and all is OK
if session != nil {
session.Close()
}
}

func (s *S) TestSelectServers(c *C) {
if !s.versionAtLeast(2, 2) {
c.Skip("read preferences introduced in 2.2")
Expand Down
16 changes: 16 additions & 0 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -806,6 +806,15 @@ func (db *Database) Run(cmd interface{}, result interface{}) error {
return db.run(socket, cmd, result)
}

// runOnSocket does the same as Run, but guarantees that your command will be run
// on the provided socket instance; if it's unhealthy, you will receive the error
// from it.
func (db *Database) runOnSocket(socket *mongoSocket, cmd interface{}, result interface{}) error {
socket.Acquire()
defer socket.Release()
return db.run(socket, cmd, result)
}

// Credential holds details to authenticate with a MongoDB server.
type Credential struct {
// Username and Password hold the basic details for authentication.
Expand Down Expand Up @@ -2270,6 +2279,13 @@ func (s *Session) Run(cmd interface{}, result interface{}) error {
return s.DB("admin").Run(cmd, result)
}

// runOnSocket does the same as Run, but guarantees that your command will be run
// on the provided socket instance; if it's unhealthy, you will receive the error
// from it.
func (s *Session) runOnSocket(socket *mongoSocket, cmd interface{}, result interface{}) error {
return s.DB("admin").runOnSocket(socket, cmd, result)
}

// SelectServers restricts communication to servers configured with the
// given tags. For example, the following statement restricts servers
// used for reading operations to those with both tag "disk" set to
Expand Down

0 comments on commit f76e4f9

Please sign in to comment.