Skip to content

Commit ec4cc37

Browse files
authored
Add read concern configuration to Safe{} (globalsign#2)
* Add read concern configuration to Safe{} * Fix tests
1 parent 654eaee commit ec4cc37

File tree

3 files changed

+52
-13
lines changed

3 files changed

+52
-13
lines changed

Diff for: session.go

+20-2
Original file line numberDiff line numberDiff line change
@@ -1889,6 +1889,7 @@ func (s *Session) SetPrefetch(p float64) {
18891889
type Safe struct {
18901890
W int // Min # of servers to ack before success
18911891
WMode string // Write mode for MongoDB 2.0+ (e.g. "majority")
1892+
RMode string // Read mode for MonogDB 3.2+ ("majority", "local", "linearizable")
18921893
WTimeout int // Milliseconds to wait for W before timing out
18931894
FSync bool // Sync via the journal if present, or via data files sync otherwise
18941895
J bool // Sync via the journal if present
@@ -1900,7 +1901,7 @@ func (s *Session) Safe() (safe *Safe) {
19001901
defer s.m.Unlock()
19011902
if s.safeOp != nil {
19021903
cmd := s.safeOp.query.(*getLastError)
1903-
safe = &Safe{WTimeout: cmd.WTimeout, FSync: cmd.FSync, J: cmd.J}
1904+
safe = &Safe{WTimeout: cmd.WTimeout, FSync: cmd.FSync, J: cmd.J, RMode: s.queryConfig.op.readConcern}
19041905
switch w := cmd.W.(type) {
19051906
case string:
19061907
safe.WMode = w
@@ -1980,6 +1981,7 @@ func (s *Session) Safe() (safe *Safe) {
19801981
//
19811982
// Relevant documentation:
19821983
//
1984+
// https://docs.mongodb.com/manual/reference/read-concern/
19831985
// http://www.mongodb.org/display/DOCS/getLastError+Command
19841986
// http://www.mongodb.org/display/DOCS/Verifying+Propagation+of+Writes+with+getLastError
19851987
// http://www.mongodb.org/display/DOCS/Data+Center+Awareness
@@ -1998,6 +2000,7 @@ func (s *Session) SetSafe(safe *Safe) {
19982000
// That is:
19992001
//
20002002
// - safe.WMode is always used if set.
2003+
// - safe.RMode is always used if set.
20012004
// - safe.W is used if larger than the current W and WMode is empty.
20022005
// - safe.FSync is always used if true.
20032006
// - safe.J is used if FSync is false.
@@ -2036,6 +2039,13 @@ func (s *Session) ensureSafe(safe *Safe) {
20362039
w = safe.W
20372040
}
20382041

2042+
// Set the read concern
2043+
switch safe.RMode {
2044+
case "majority", "local", "linearizable":
2045+
s.queryConfig.op.readConcern = safe.RMode
2046+
default:
2047+
}
2048+
20392049
var cmd getLastError
20402050
if s.safeOp == nil {
20412051
cmd = getLastError{1, w, safe.WTimeout, safe.FSync, safe.J}
@@ -3284,7 +3294,9 @@ func prepareFindOp(socket *mongoSocket, op *queryOp, limit int32) bool {
32843294
Snapshot: op.options.Snapshot,
32853295
OplogReplay: op.flags&flagLogReplay != 0,
32863296
Collation: op.options.Collation,
3297+
ReadConcern: readLevel{level: op.readConcern},
32873298
}
3299+
32883300
if op.limit < 0 {
32893301
find.BatchSize = -op.limit
32903302
find.SingleBatch = true
@@ -3334,7 +3346,7 @@ type findCmd struct {
33343346
Comment string `bson:"comment,omitempty"`
33353347
MaxScan int `bson:"maxScan,omitempty"`
33363348
MaxTimeMS int `bson:"maxTimeMS,omitempty"`
3337-
ReadConcern interface{} `bson:"readConcern,omitempty"`
3349+
ReadConcern readLevel `bson:"readConcern,omitempty"`
33383350
Max interface{} `bson:"max,omitempty"`
33393351
Min interface{} `bson:"min,omitempty"`
33403352
ReturnKey bool `bson:"returnKey,omitempty"`
@@ -3348,6 +3360,12 @@ type findCmd struct {
33483360
Collation *Collation `bson:"collation,omitempty"`
33493361
}
33503362

3363+
// readLevel provides the nested "level: majority" serialisation needed for the
3364+
// query read concern.
3365+
type readLevel struct {
3366+
level string `bson:"level,omitempty"`
3367+
}
3368+
33513369
// getMoreCmd holds the command used for requesting more query results on MongoDB 3.2+.
33523370
//
33533371
// Relevant documentation:

Diff for: session_test.go

+20
Original file line numberDiff line numberDiff line change
@@ -2781,6 +2781,7 @@ func (s *S) TestSafeSetting(c *C) {
27812781
safe := session.Safe()
27822782
c.Assert(safe.W, Equals, 0)
27832783
c.Assert(safe.WMode, Equals, "")
2784+
c.Assert(safe.RMode, Equals, "")
27842785
c.Assert(safe.WTimeout, Equals, 0)
27852786
c.Assert(safe.FSync, Equals, false)
27862787
c.Assert(safe.J, Equals, false)
@@ -2790,6 +2791,7 @@ func (s *S) TestSafeSetting(c *C) {
27902791
safe = session.Safe()
27912792
c.Assert(safe.W, Equals, 1)
27922793
c.Assert(safe.WMode, Equals, "")
2794+
c.Assert(safe.RMode, Equals, "")
27932795
c.Assert(safe.WTimeout, Equals, 2)
27942796
c.Assert(safe.FSync, Equals, true)
27952797
c.Assert(safe.J, Equals, false)
@@ -2799,6 +2801,7 @@ func (s *S) TestSafeSetting(c *C) {
27992801
safe = session.Safe()
28002802
c.Assert(safe.W, Equals, 0)
28012803
c.Assert(safe.WMode, Equals, "")
2804+
c.Assert(safe.RMode, Equals, "")
28022805
c.Assert(safe.WTimeout, Equals, 0)
28032806
c.Assert(safe.FSync, Equals, false)
28042807
c.Assert(safe.J, Equals, false)
@@ -2808,6 +2811,7 @@ func (s *S) TestSafeSetting(c *C) {
28082811
safe = session.Safe()
28092812
c.Assert(safe.W, Equals, 5)
28102813
c.Assert(safe.WMode, Equals, "")
2814+
c.Assert(safe.RMode, Equals, "")
28112815
c.Assert(safe.WTimeout, Equals, 6)
28122816
c.Assert(safe.FSync, Equals, false)
28132817
c.Assert(safe.J, Equals, true)
@@ -2817,6 +2821,7 @@ func (s *S) TestSafeSetting(c *C) {
28172821
safe = session.Safe()
28182822
c.Assert(safe.W, Equals, 5)
28192823
c.Assert(safe.WMode, Equals, "")
2824+
c.Assert(safe.RMode, Equals, "")
28202825
c.Assert(safe.WTimeout, Equals, 6)
28212826
c.Assert(safe.FSync, Equals, false)
28222827
c.Assert(safe.J, Equals, true)
@@ -2826,6 +2831,7 @@ func (s *S) TestSafeSetting(c *C) {
28262831
safe = session.Safe()
28272832
c.Assert(safe.W, Equals, 6)
28282833
c.Assert(safe.WMode, Equals, "")
2834+
c.Assert(safe.RMode, Equals, "")
28292835
c.Assert(safe.WTimeout, Equals, 4)
28302836
c.Assert(safe.FSync, Equals, true)
28312837
c.Assert(safe.J, Equals, false)
@@ -2835,6 +2841,17 @@ func (s *S) TestSafeSetting(c *C) {
28352841
safe = session.Safe()
28362842
c.Assert(safe.W, Equals, 0)
28372843
c.Assert(safe.WMode, Equals, "majority")
2844+
c.Assert(safe.RMode, Equals, "")
2845+
c.Assert(safe.WTimeout, Equals, 2)
2846+
c.Assert(safe.FSync, Equals, true)
2847+
c.Assert(safe.J, Equals, false)
2848+
2849+
// Read concern
2850+
session.EnsureSafe(&mgo.Safe{RMode: "majority"})
2851+
safe = session.Safe()
2852+
c.Assert(safe.W, Equals, 0)
2853+
c.Assert(safe.WMode, Equals, "majority")
2854+
c.Assert(safe.RMode, Equals, "majority")
28382855
c.Assert(safe.WTimeout, Equals, 2)
28392856
c.Assert(safe.FSync, Equals, true)
28402857
c.Assert(safe.J, Equals, false)
@@ -2844,6 +2861,7 @@ func (s *S) TestSafeSetting(c *C) {
28442861
safe = session.Safe()
28452862
c.Assert(safe.W, Equals, 0)
28462863
c.Assert(safe.WMode, Equals, "something")
2864+
c.Assert(safe.RMode, Equals, "majority")
28472865
c.Assert(safe.WTimeout, Equals, 2)
28482866
c.Assert(safe.FSync, Equals, true)
28492867
c.Assert(safe.J, Equals, false)
@@ -2853,6 +2871,7 @@ func (s *S) TestSafeSetting(c *C) {
28532871
safe = session.Safe()
28542872
c.Assert(safe.W, Equals, 0)
28552873
c.Assert(safe.WMode, Equals, "something")
2874+
c.Assert(safe.RMode, Equals, "majority")
28562875
c.Assert(safe.WTimeout, Equals, 2)
28572876
c.Assert(safe.FSync, Equals, true)
28582877
c.Assert(safe.J, Equals, false)
@@ -2863,6 +2882,7 @@ func (s *S) TestSafeSetting(c *C) {
28632882
clone.EnsureSafe(&mgo.Safe{WMode: "foo"})
28642883
safe = session.Safe()
28652884
c.Assert(safe.WMode, Equals, "something")
2885+
c.Assert(safe.RMode, Equals, "majority")
28662886
}
28672887

28682888
func (s *S) TestSafeInsert(c *C) {

Diff for: socket.go

+12-11
Original file line numberDiff line numberDiff line change
@@ -67,17 +67,18 @@ const (
6767
)
6868

6969
type queryOp struct {
70-
query interface{}
71-
collection string
72-
serverTags []bson.D
73-
selector interface{}
74-
replyFunc replyFunc
75-
mode Mode
76-
skip int32
77-
limit int32
78-
options queryWrapper
79-
hasOptions bool
80-
flags queryOpFlags
70+
query interface{}
71+
collection string
72+
serverTags []bson.D
73+
selector interface{}
74+
replyFunc replyFunc
75+
mode Mode
76+
skip int32
77+
limit int32
78+
options queryWrapper
79+
hasOptions bool
80+
flags queryOpFlags
81+
readConcern string
8182
}
8283

8384
type queryWrapper struct {

0 commit comments

Comments
 (0)