diff --git a/.travis.yml b/.travis.yml index f489c96ac..21468edf2 100644 --- a/.travis.yml +++ b/.travis.yml @@ -46,5 +46,6 @@ script: - (cd bson && go test -check.v) - go test -check.v -fast - (cd txn && go test -check.v) + - make stopdb # vim:sw=4:ts=4:et diff --git a/README.md b/README.md index ca1de13df..4765043a1 100644 --- a/README.md +++ b/README.md @@ -15,17 +15,23 @@ Further PR's (with tests) are welcome, but please maintain backwards compatibili * Support majority read concerns ([details](https://github.com/globalsign/mgo/pull/2)) * Improved connection handling ([details](https://github.com/globalsign/mgo/pull/5)) * Hides SASL warnings ([details](https://github.com/globalsign/mgo/pull/7)) -* Improved multi-document transaction performance ([details](https://github.com/globalsign/mgo/pull/10), [more](https://github.com/globalsign/mgo/pull/11)) -* Integration tests run against newest MongoDB 3.2 releases ([details](https://github.com/globalsign/mgo/pull/4)) * Support for partial indexes ([detials](https://github.com/domodwyer/mgo/commit/5efe8eccb028238d93c222828cae4806aeae9f51)) +* Fixes timezone handling ([details](https://github.com/go-mgo/mgo/pull/464)) +* Integration tests run against newest MongoDB 3.2 releases ([details](https://github.com/globalsign/mgo/pull/4)) +* Improved multi-document transaction performance ([details](https://github.com/globalsign/mgo/pull/10), [more](https://github.com/globalsign/mgo/pull/11), [more](https://github.com/globalsign/mgo/pull/16)) +* Fixes cursor timeouts ([detials](https://jira.mongodb.org/browse/SERVER-24899)) +* Support index hints and timeouts for count queries ([details](https://github.com/globalsign/mgo/pull/17)) --- ### Thanks to +* @BenLubar * @carter2000 * @cezarsa -* @eaglerayp * @drichelson +* @eaglerayp +* @fmpwizard * @jameinel +* @Reenjii * @smoya * @wgallagher \ No newline at end of file diff --git a/bson/json.go b/bson/json.go index 3cfa102c4..045c71301 100644 --- a/bson/json.go +++ b/bson/json.go @@ -5,6 +5,7 @@ import ( "encoding/base64" "fmt" "strconv" + "strings" "time" "github.com/globalsign/mgo/internal/json" @@ -156,7 +157,7 @@ func jencBinaryType(v interface{}) ([]byte, error) { return fbytes(`{"$binary":"%s","$type":"0x%x"}`, out, in.Kind), nil } -const jdateFormat = "2006-01-02T15:04:05.999Z" +const jdateFormat = "2006-01-02T15:04:05.999Z07:00" func jdecDate(data []byte) (interface{}, error) { var v struct { @@ -170,13 +171,15 @@ func jdecDate(data []byte) (interface{}, error) { v.S = v.Func.S } if v.S != "" { + var errs []string for _, format := range []string{jdateFormat, "2006-01-02"} { t, err := time.Parse(format, v.S) if err == nil { return t, nil } + errs = append(errs, err.Error()) } - return nil, fmt.Errorf("cannot parse date: %q", v.S) + return nil, fmt.Errorf("cannot parse date: %q [%s]", v.S, strings.Join(errs, ", ")) } var vn struct { diff --git a/bson/json_test.go b/bson/json_test.go index d8ce798f9..880fb87c2 100644 --- a/bson/json_test.go +++ b/bson/json_test.go @@ -34,12 +34,18 @@ var jsonTests = []jsonTest{ { a: time.Date(2016, 5, 15, 1, 2, 3, 4000000, time.UTC), b: `{"$date":"2016-05-15T01:02:03.004Z"}`, + }, { + a: time.Date(2016, 5, 15, 1, 2, 3, 4000000, time.FixedZone("CET", 60*60)), + b: `{"$date":"2016-05-15T01:02:03.004+01:00"}`, }, { b: `{"$date": {"$numberLong": "1002"}}`, c: time.Date(1970, 1, 1, 0, 0, 1, 2e6, time.UTC), }, { b: `ISODate("2016-05-15T01:02:03.004Z")`, c: time.Date(2016, 5, 15, 1, 2, 3, 4000000, time.UTC), + }, { + b: `ISODate("2016-05-15T01:02:03.004-07:00")`, + c: time.Date(2016, 5, 15, 1, 2, 3, 4000000, time.FixedZone("PDT", -7*60*60)), }, { b: `new Date(1000)`, c: time.Date(1970, 1, 1, 0, 0, 1, 0, time.UTC), @@ -180,6 +186,11 @@ func (s *S) TestJSON(c *C) { value = zerov.Elem().Interface() } c.Logf("Loaded: %#v", value) + if ctime, ok := item.c.(time.Time); ok { + // time.Time must be compared with time.Time.Equal and not reflect.DeepEquals + c.Assert(ctime.Equal(value.(time.Time)), Equals, true) + continue + } c.Assert(value, DeepEquals, item.c) } } diff --git a/session.go b/session.go index 829aba443..6827dffe5 100644 --- a/session.go +++ b/session.go @@ -3281,20 +3281,23 @@ func prepareFindOp(socket *mongoSocket, op *queryOp, limit int32) bool { } find := findCmd{ - Collection: op.collection[nameDot+1:], - Filter: op.query, - Projection: op.selector, - Sort: op.options.OrderBy, - Skip: op.skip, - Limit: limit, - MaxTimeMS: op.options.MaxTimeMS, - MaxScan: op.options.MaxScan, - Hint: op.options.Hint, - Comment: op.options.Comment, - Snapshot: op.options.Snapshot, - OplogReplay: op.flags&flagLogReplay != 0, - Collation: op.options.Collation, - ReadConcern: readLevel{level: op.readConcern}, + Collection: op.collection[nameDot+1:], + Filter: op.query, + Projection: op.selector, + Sort: op.options.OrderBy, + Skip: op.skip, + Limit: limit, + MaxTimeMS: op.options.MaxTimeMS, + MaxScan: op.options.MaxScan, + Hint: op.options.Hint, + Comment: op.options.Comment, + Snapshot: op.options.Snapshot, + Collation: op.options.Collation, + Tailable: op.flags&flagTailable != 0, + AwaitData: op.flags&flagAwaitData != 0, + OplogReplay: op.flags&flagLogReplay != 0, + NoCursorTimeout: op.flags&flagNoCursorTimeout != 0, + ReadConcern: readLevel{level: op.readConcern}, } if op.limit < 0 { @@ -4083,10 +4086,12 @@ func (iter *Iter) getMoreCmd() *queryOp { } type countCmd struct { - Count string - Query interface{} - Limit int32 ",omitempty" - Skip int32 ",omitempty" + Count string + Query interface{} + Limit int32 ",omitempty" + Skip int32 ",omitempty" + Hint bson.D `bson:"hint,omitempty"` + MaxTimeMS int `bson:"maxTimeMS,omitempty"` } // Count returns the total number of documents in the result set. @@ -4108,8 +4113,12 @@ func (q *Query) Count() (n int, err error) { if query == nil { query = bson.D{} } + // not checking the error because if type assertion fails, we + // simply want a Zero bson.D + hint, _ := q.op.options.Hint.(bson.D) result := struct{ N int }{} - err = session.DB(dbname).Run(countCmd{cname, query, limit, op.skip}, &result) + err = session.DB(dbname).Run(countCmd{cname, query, limit, op.skip, hint, op.options.MaxTimeMS}, &result) + return result.N, err } diff --git a/session_test.go b/session_test.go index a4ce5482a..912f1c92a 100644 --- a/session_test.go +++ b/session_test.go @@ -1275,6 +1275,49 @@ func (s *S) TestCountSkipLimit(c *C) { c.Assert(n, Equals, 4) } +func (s *S) TestCountMaxTimeMS(c *C) { + if !s.versionAtLeast(2, 6) { + c.Skip("SetMaxTime only supported in 2.6+") + } + + session, err := mgo.Dial("localhost:40001") + c.Assert(err, IsNil) + defer session.Close() + + coll := session.DB("mydb").C("mycoll") + + ns := make([]int, 100000) + for _, n := range ns { + err := coll.Insert(M{"n": n}) + c.Assert(err, IsNil) + } + _, err = coll.Find(M{"n": M{"$gt": 1}}).SetMaxTime(1 * time.Millisecond).Count() + e := err.(*mgo.QueryError) + // We hope this query took longer than 1 ms, which triggers an error code 50 + c.Assert(e.Code, Equals, 50) + +} + +func (s *S) TestCountHint(c *C) { + if !s.versionAtLeast(2, 6) { + c.Skip("Not implemented until mongo 2.5.5 https://jira.mongodb.org/browse/SERVER-2677") + } + + session, err := mgo.Dial("localhost:40001") + c.Assert(err, IsNil) + defer session.Close() + + coll := session.DB("mydb").C("mycoll") + err = coll.Insert(M{"n": 1}) + c.Assert(err, IsNil) + + _, err = coll.Find(M{"n": M{"$gt": 1}}).Hint("does_not_exists").Count() + e := err.(*mgo.QueryError) + // If Hint wasn't doing anything, then Count would ignore the non existent index hint + // and return the normal ount. But we instead get an error code 2: bad hint + c.Assert(e.Code, Equals, 2) +} + func (s *S) TestQueryExplain(c *C) { session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) @@ -1673,7 +1716,7 @@ func (s *S) TestResumeIter(c *C) { c.Assert(len(batch), Equals, 0) } -var cursorTimeout = flag.Bool("cursor-timeout", false, "Enable cursor timeout test") +var cursorTimeout = flag.Bool("cursor-timeout", false, "Enable cursor timeout tests") func (s *S) TestFindIterCursorTimeout(c *C) { if !*cursorTimeout { @@ -1717,6 +1760,56 @@ func (s *S) TestFindIterCursorTimeout(c *C) { c.Assert(iter.Err(), Equals, mgo.ErrCursor) } +func (s *S) TestFindIterCursorNoTimeout(c *C) { + if !*cursorTimeout { + c.Skip("-cursor-timeout") + } + session, err := mgo.Dial("localhost:40001") + c.Assert(err, IsNil) + defer session.Close() + + session.SetCursorTimeout(0) + + type Doc struct { + Id int "_id" + } + + coll := session.DB("test").C("test") + coll.Remove(nil) + for i := 0; i < 100; i++ { + err = coll.Insert(Doc{i}) + c.Assert(err, IsNil) + } + + session.SetBatch(1) + iter := coll.Find(nil).Iter() + var doc Doc + if !iter.Next(&doc) { + c.Fatalf("iterator failed to return any documents") + } + + for i := 10; i > 0; i-- { + c.Logf("Sleeping... %d minutes to go...", i) + time.Sleep(1*time.Minute + 2*time.Second) + } + + // Drain any existing documents that were fetched. + if !iter.Next(&doc) { + c.Fatalf("iterator failed to return previously cached document") + } + for i := 1; i < 100; i++ { + if !iter.Next(&doc) { + c.Errorf("iterator failed on iteration %d", i) + break + } + } + if iter.Next(&doc) { + c.Error("iterator returned more than 100 documents") + } + + c.Assert(iter.Err(), IsNil) +} + func (s *S) TestTooManyItemsLimitBug(c *C) { if *fast { c.Skip("-fast") diff --git a/txn/flusher.go b/txn/flusher.go index b1ead31d9..43e01f4a9 100644 --- a/txn/flusher.go +++ b/txn/flusher.go @@ -13,7 +13,7 @@ func flush(r *Runner, t *transaction) error { Runner: r, goal: t, goalKeys: make(map[docKey]bool), - queue: make(map[docKey][]token), + queue: make(map[docKey][]tokenAndId), debugId: debugPrefix(), } for _, dkey := range f.goal.docKeys() { @@ -26,10 +26,36 @@ type flusher struct { *Runner goal *transaction goalKeys map[docKey]bool - queue map[docKey][]token + queue map[docKey][]tokenAndId debugId string } +type tokenAndId struct { + tt token + bid bson.ObjectId +} + +func (ti tokenAndId) id() bson.ObjectId { + return ti.bid +} + +func (ti tokenAndId) nonce() string { + return ti.tt.nonce() +} + +func (ti tokenAndId) String() string { + return string(ti.tt) +} + +func tokensWithIds(q []token) []tokenAndId { + out := make([]tokenAndId, len(q)) + for i, tt := range q { + out[i].tt = tt + out[i].bid = tt.id() + } + return out +} + func (f *flusher) run() (err error) { if chaosEnabled { defer f.handleChaos(&err) @@ -37,7 +63,8 @@ func (f *flusher) run() (err error) { f.debugf("Processing %s", f.goal) seen := make(map[bson.ObjectId]*transaction) - if err := f.recurse(f.goal, seen); err != nil { + preloaded := make(map[bson.ObjectId]*transaction) + if err := f.recurse(f.goal, seen, preloaded); err != nil { return err } if f.goal.done() { @@ -129,26 +156,54 @@ func (f *flusher) run() (err error) { return nil } -func (f *flusher) recurse(t *transaction, seen map[bson.ObjectId]*transaction) error { +const preloadBatchSize = 100 + +func (f *flusher) recurse(t *transaction, seen map[bson.ObjectId]*transaction, preloaded map[bson.ObjectId]*transaction) error { seen[t.Id] = t + delete(preloaded, t.Id) err := f.advance(t, nil, false) if err != errPreReqs { return err } for _, dkey := range t.docKeys() { + remaining := make([]bson.ObjectId, 0, len(f.queue[dkey])) + toPreload := make(map[bson.ObjectId]struct{}, len(f.queue[dkey])) for _, dtt := range f.queue[dkey] { id := dtt.id() - if seen[id] != nil { + if _, scheduled := toPreload[id]; seen[id] != nil || scheduled || preloaded[id] != nil { continue } - qt, err := f.load(id) - if err != nil { - return err + toPreload[id] = struct{}{} + remaining = append(remaining, id) + } + // done with this map + toPreload = nil + for len(remaining) > 0 { + batch := remaining + if len(batch) > preloadBatchSize { + batch = remaining[:preloadBatchSize] } - err = f.recurse(qt, seen) + remaining = remaining[len(batch):] + err := f.loadMulti(batch, preloaded) if err != nil { return err } + for _, id := range batch { + if seen[id] != nil { + continue + } + qt, ok := preloaded[id] + if !ok { + qt, err = f.load(id) + if err != nil { + return err + } + } + err = f.recurse(qt, seen, preloaded) + if err != nil { + return err + } + } } } return nil @@ -245,10 +300,20 @@ NextDoc: change.Upsert = false chaos("") if _, err := cquery.Apply(change, &info); err == nil { + if f.opts.MaxTxnQueueLength > 0 && len(info.Queue) > f.opts.MaxTxnQueueLength { + // abort with TXN Queue too long, but remove the entry we just added + innerErr := c.UpdateId(dkey.Id, + bson.D{{"$pullAll", bson.D{{"txn-queue", []token{tt}}}}}) + if innerErr != nil { + f.debugf("error while backing out of queue-too-long: %v", innerErr) + } + return nil, fmt.Errorf("txn-queue for %v in %q has too many transactions (%d)", + dkey.Id, dkey.C, len(info.Queue)) + } if info.Remove == "" { // Fast path, unless workload is insert/remove heavy. revno[dkey] = info.Revno - f.queue[dkey] = info.Queue + f.queue[dkey] = tokensWithIds(info.Queue) f.debugf("[A] Prepared document %v with revno %d and queue: %v", dkey, info.Revno, info.Queue) continue NextDoc } else { @@ -310,7 +375,7 @@ NextDoc: f.debugf("[B] Prepared document %v with revno %d and queue: %v", dkey, info.Revno, info.Queue) } revno[dkey] = info.Revno - f.queue[dkey] = info.Queue + f.queue[dkey] = tokensWithIds(info.Queue) continue NextDoc } } @@ -452,7 +517,7 @@ func (f *flusher) rescan(t *transaction, force bool) (revnos []int64, err error) break } } - f.queue[dkey] = info.Queue + f.queue[dkey] = tokensWithIds(info.Queue) if !found { // Rescanned transaction id was not in the queue. This could mean one // of three things: @@ -516,12 +581,13 @@ func assembledRevnos(ops []Op, revno map[docKey]int64) []int64 { func (f *flusher) hasPreReqs(tt token, dkeys docKeys) (prereqs, found bool) { found = true + ttId := tt.id() NextDoc: for _, dkey := range dkeys { for _, dtt := range f.queue[dkey] { - if dtt == tt { + if dtt.tt == tt { continue NextDoc - } else if dtt.id() != tt.id() { + } else if dtt.id() != ttId { prereqs = true } } @@ -909,17 +975,17 @@ func (f *flusher) apply(t *transaction, pull map[bson.ObjectId]*transaction) err return nil } -func tokensToPull(dqueue []token, pull map[bson.ObjectId]*transaction, dontPull token) []token { +func tokensToPull(dqueue []tokenAndId, pull map[bson.ObjectId]*transaction, dontPull token) []token { var result []token for j := len(dqueue) - 1; j >= 0; j-- { dtt := dqueue[j] - if dtt == dontPull { + if dtt.tt == dontPull { continue } if _, ok := pull[dtt.id()]; ok { // It was handled before and this is a leftover invalid // nonce in the queue. Cherry-pick it out. - result = append(result, dtt) + result = append(result, dtt.tt) } } return result diff --git a/txn/txn.go b/txn/txn.go index d9a9f9657..8b44c8339 100644 --- a/txn/txn.go +++ b/txn/txn.go @@ -217,11 +217,14 @@ const ( // A Runner applies operations as part of a transaction onto any number // of collections within a database. See the Run method for details. type Runner struct { - tc *mgo.Collection // txns - sc *mgo.Collection // stash - lc *mgo.Collection // log + tc *mgo.Collection // txns + sc *mgo.Collection // stash + lc *mgo.Collection // log + opts RunnerOptions // runtime options } +const defaultMaxTxnQueueLength = 1000 + // NewRunner returns a new transaction runner that uses tc to hold its // transactions. // @@ -233,7 +236,36 @@ type Runner struct { // will be used for implementing the transactional behavior of insert // and remove operations. func NewRunner(tc *mgo.Collection) *Runner { - return &Runner{tc, tc.Database.C(tc.Name + ".stash"), nil} + return &Runner{ + tc: tc, + sc: tc.Database.C(tc.Name + ".stash"), + lc: nil, + opts: DefaultRunnerOptions(), + } +} + +// RunnerOptions encapsulates ways you can tweak transaction Runner behavior. +type RunnerOptions struct { + // MaxTxnQueueLength is a way to limit bad behavior. Many operations on + // transaction queues are O(N^2), and transaction queues growing too large + // are usually indicative of a bug in behavior. This should be larger + // than the maximum number of concurrent operations to a single document. + // Normal operations are likely to only ever hit 10 or so, we use a default + // maximum length of 1000. + MaxTxnQueueLength int +} + +// SetOptions allows people to change some of the internal behavior of a Runner. +func (r *Runner) SetOptions(opts RunnerOptions) { + r.opts = opts +} + +// DefaultRunnerOptions defines default behavior for a Runner. +// Users can use the DefaultRunnerOptions to only override specific behavior. +func DefaultRunnerOptions() RunnerOptions { + return RunnerOptions{ + MaxTxnQueueLength: defaultMaxTxnQueueLength, + } } var ErrAborted = fmt.Errorf("transaction aborted") @@ -460,6 +492,26 @@ func (r *Runner) load(id bson.ObjectId) (*transaction, error) { return &t, nil } +func (r *Runner) loadMulti(ids []bson.ObjectId, preloaded map[bson.ObjectId]*transaction) error { + txns := make([]transaction, 0, len(ids)) + + query := r.tc.Find(bson.M{"_id": bson.M{"$in": ids}}) + // Not sure that this actually has much of an effect when using All() + query.Batch(len(ids)) + err := query.All(&txns) + if err == mgo.ErrNotFound { + return fmt.Errorf("could not find a transaction in batch: %v", ids) + } else if err != nil { + return err + } + for i := range txns { + t := &txns[i] + preloaded[t.Id] = t + } + return nil +} + + type typeNature int const ( diff --git a/txn/txn_test.go b/txn/txn_test.go index ce9d138e2..8b85986b5 100644 --- a/txn/txn_test.go +++ b/txn/txn_test.go @@ -621,6 +621,90 @@ func (s *S) TestTxnQueueStashStressTest(c *C) { } } +func (s *S) checkTxnQueueLength(c *C, expectedQueueLength int) { + txn.SetDebug(false) + txn.SetChaos(txn.Chaos{ + KillChance: 1, + Breakpoint: "set-applying", + }) + defer txn.SetChaos(txn.Chaos{}) + err := s.accounts.Insert(M{"_id": 0, "balance": 100}) + c.Assert(err, IsNil) + ops := []txn.Op{{ + C: "accounts", + Id: 0, + Update: M{"$inc": M{"balance": 100}}, + }} + for i := 0; i < expectedQueueLength; i++ { + err := s.runner.Run(ops, "", nil) + c.Assert(err, Equals, txn.ErrChaos) + } + txn.SetDebug(true) + // Now that we've filled up the queue, we should see that there are 1000 + // items in the queue, and the error applying a new one will change. + var doc bson.M + err = s.accounts.FindId(0).One(&doc) + c.Assert(err, IsNil) + c.Check(len(doc["txn-queue"].([]interface{})), Equals, expectedQueueLength) + err = s.runner.Run(ops, "", nil) + c.Check(err, ErrorMatches, `txn-queue for 0 in "accounts" has too many transactions \(\d+\)`) + // The txn-queue should not have grown + err = s.accounts.FindId(0).One(&doc) + c.Assert(err, IsNil) + c.Check(len(doc["txn-queue"].([]interface{})), Equals, expectedQueueLength) +} + +func (s *S) TestTxnQueueDefaultMaxSize(c *C) { + s.runner.SetOptions(txn.DefaultRunnerOptions()) + s.checkTxnQueueLength(c, 1000) +} + +func (s *S) TestTxnQueueCustomMaxSize(c *C) { + opts := txn.DefaultRunnerOptions() + opts.MaxTxnQueueLength = 100 + s.runner.SetOptions(opts) + s.checkTxnQueueLength(c, 100) +} + +func (s *S) TestTxnQueueUnlimited(c *C) { + opts := txn.DefaultRunnerOptions() + // A value of 0 should mean 'unlimited' + opts.MaxTxnQueueLength = 0 + s.runner.SetOptions(opts) + // it isn't possible to actually prove 'unlimited' but we can prove that + // we at least can insert more than the default number of transactions + // without getting a 'too many transactions' failure. + txn.SetDebug(false) + txn.SetChaos(txn.Chaos{ + KillChance: 1, + // Use set-prepared because we are adding more transactions than + // other tests, and this speeds up setup time a bit + Breakpoint: "set-prepared", + }) + defer txn.SetChaos(txn.Chaos{}) + err := s.accounts.Insert(M{"_id": 0, "balance": 100}) + c.Assert(err, IsNil) + ops := []txn.Op{{ + C: "accounts", + Id: 0, + Update: M{"$inc": M{"balance": 100}}, + }} + for i := 0; i < 1100; i++ { + err := s.runner.Run(ops, "", nil) + c.Assert(err, Equals, txn.ErrChaos) + } + txn.SetDebug(true) + var doc bson.M + err = s.accounts.FindId(0).One(&doc) + c.Assert(err, IsNil) + c.Check(len(doc["txn-queue"].([]interface{})), Equals, 1100) + err = s.runner.Run(ops, "", nil) + c.Check(err, Equals, txn.ErrChaos) + err = s.accounts.FindId(0).One(&doc) + c.Assert(err, IsNil) + c.Check(len(doc["txn-queue"].([]interface{})), Equals, 1101) +} + func (s *S) TestPurgeMissingPipelineSizeLimit(c *C) { // This test ensures that PurgeMissing can handle very large // txn-queue fields. Previous iterations of PurgeMissing would @@ -703,6 +787,8 @@ func (s *S) TestPurgeMissingPipelineSizeLimit(c *C) { } var flaky = flag.Bool("flaky", false, "Include flaky tests") +var txnQueueLength = flag.Int("qlength", 100, "txn-queue length for tests") + func (s *S) TestTxnQueueStressTest(c *C) { // This fails about 20% of the time on Mongo 3.2 (I haven't tried @@ -776,3 +862,117 @@ func (s *S) TestTxnQueueStressTest(c *C) { } } } + +type txnQueue struct { + Queue []string `bson:"txn-queue"` +} + +func (s *S) TestTxnQueueAssertionGrowth(c *C) { + txn.SetDebug(false) // too much spam + err := s.accounts.Insert(M{"_id": 0, "balance": 0}) + c.Assert(err, IsNil) + // Create many assertion only transactions. + t := time.Now() + ops := []txn.Op{{ + C: "accounts", + Id: 0, + Assert: M{"balance": 0}, + }} + for n := 0; n < *txnQueueLength; n++ { + err = s.runner.Run(ops, "", nil) + c.Assert(err, IsNil) + } + var qdoc txnQueue + err = s.accounts.FindId(0).One(&qdoc) + c.Assert(err, IsNil) + c.Check(len(qdoc.Queue), Equals, *txnQueueLength) + c.Logf("%8.3fs to set up %d assertions", time.Since(t).Seconds(), *txnQueueLength) + t = time.Now() + txn.SetChaos(txn.Chaos{}) + ops = []txn.Op{{ + C: "accounts", + Id: 0, + Update: M{"$inc": M{"balance": 100}}, + }} + err = s.runner.Run(ops, "", nil) + c.Logf("%8.3fs to clear N=%d assertions and add one more txn", + time.Since(t).Seconds(), *txnQueueLength) + err = s.accounts.FindId(0).One(&qdoc) + c.Assert(err, IsNil) + c.Check(len(qdoc.Queue), Equals, 1) +} + +func (s *S) TestTxnQueueBrokenPrepared(c *C) { + txn.SetDebug(false) // too much spam + badTxnToken := "123456789012345678901234_deadbeef" + err := s.accounts.Insert(M{"_id": 0, "balance": 0, "txn-queue": []string{badTxnToken}}) + c.Assert(err, IsNil) + t := time.Now() + ops := []txn.Op{{ + C: "accounts", + Id: 0, + Update: M{"$set": M{"balance": 0}}, + }} + errString := `cannot find transaction ObjectIdHex("123456789012345678901234")` + for n := 0; n < *txnQueueLength; n++ { + err = s.runner.Run(ops, "", nil) + c.Assert(err.Error(), Equals, errString) + } + var qdoc txnQueue + err = s.accounts.FindId(0).One(&qdoc) + c.Assert(err, IsNil) + c.Check(len(qdoc.Queue), Equals, *txnQueueLength+1) + c.Logf("%8.3fs to set up %d 'prepared' txns", time.Since(t).Seconds(), *txnQueueLength) + t = time.Now() + s.accounts.UpdateId(0, bson.M{"$pullAll": bson.M{"txn-queue": []string{badTxnToken}}}) + ops = []txn.Op{{ + C: "accounts", + Id: 0, + Update: M{"$inc": M{"balance": 100}}, + }} + err = s.runner.ResumeAll() + c.Assert(err, IsNil) + c.Logf("%8.3fs to ResumeAll N=%d 'prepared' txns", + time.Since(t).Seconds(), *txnQueueLength) + err = s.accounts.FindId(0).One(&qdoc) + c.Assert(err, IsNil) + c.Check(len(qdoc.Queue), Equals, 1) +} + +func (s *S) TestTxnQueuePreparing(c *C) { + txn.SetDebug(false) // too much spam + err := s.accounts.Insert(M{"_id": 0, "balance": 0, "txn-queue": []string{}}) + c.Assert(err, IsNil) + t := time.Now() + txn.SetChaos(txn.Chaos{ + KillChance: 1.0, + Breakpoint: "set-prepared", + }) + ops := []txn.Op{{ + C: "accounts", + Id: 0, + Update: M{"$set": M{"balance": 0}}, + }} + for n := 0; n < *txnQueueLength; n++ { + err = s.runner.Run(ops, "", nil) + c.Assert(err, Equals, txn.ErrChaos) + } + var qdoc txnQueue + err = s.accounts.FindId(0).One(&qdoc) + c.Assert(err, IsNil) + c.Check(len(qdoc.Queue), Equals, *txnQueueLength) + c.Logf("%8.3fs to set up %d 'preparing' txns", time.Since(t).Seconds(), *txnQueueLength) + txn.SetChaos(txn.Chaos{}) + t = time.Now() + err = s.runner.ResumeAll() + c.Logf("%8.3fs to ResumeAll N=%d 'preparing' txns", + time.Since(t).Seconds(), *txnQueueLength) + err = s.accounts.FindId(0).One(&qdoc) + c.Assert(err, IsNil) + expectedCount := 100 + if *txnQueueLength <= expectedCount { + expectedCount = *txnQueueLength - 1 + } + c.Check(len(qdoc.Queue), Equals, expectedCount) +} +