Skip to content

Commit 71edba3

Browse files
GODRIVER-2828 Use topology version from Server instead of Connection in ProcessError. (#1252)
Co-authored-by: Preston Vasquez <[email protected]>
1 parent cf14ffe commit 71edba3

File tree

3 files changed

+439
-181
lines changed

3 files changed

+439
-181
lines changed

x/mongo/driver/driver.go

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -158,12 +158,6 @@ const (
158158
ConnectionPoolCleared
159159
)
160160

161-
// ServerChanged returns true if the ProcessErrorResult indicates that the server changed from an SDAM perspective
162-
// during a ProcessError() call.
163-
func (p ProcessErrorResult) ServerChanged() bool {
164-
return p != NoChange
165-
}
166-
167161
// ErrorProcessor implementations can handle processing errors, which may modify their internal state.
168162
// If this type is implemented by a Server, then Operation.Execute will call it's ProcessError
169163
// method after it decodes a wire message.

x/mongo/driver/topology/server.go

Lines changed: 48 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
)
2525

2626
const minHeartbeatInterval = 500 * time.Millisecond
27+
const wireVersion42 = 8 // Wire version for MongoDB 4.2
2728

2829
// Server state constants.
2930
const (
@@ -295,6 +296,8 @@ func (s *Server) ProcessHandshakeError(err error, startingGenerationNumber uint6
295296
return
296297
}
297298

299+
// Unwrap any connection errors. If there is no wrapped connection error, then the error should
300+
// not result in any Server state change (e.g. a command error from the database).
298301
wrappedConnErr := unwrapConnectionError(err)
299302
if wrappedConnErr == nil {
300303
return
@@ -385,27 +388,58 @@ func getWriteConcernErrorForProcessing(err error) (*driver.WriteConcernError, bo
385388

386389
// ProcessError handles SDAM error handling and implements driver.ErrorProcessor.
387390
func (s *Server) ProcessError(err error, conn driver.Connection) driver.ProcessErrorResult {
388-
// ignore nil error
391+
// Ignore nil errors.
389392
if err == nil {
390393
return driver.NoChange
391394
}
392395

396+
// Ignore errors from stale connections because the error came from a previous generation of the
397+
// connection pool. The root cause of the error has aleady been handled, which is what caused
398+
// the pool generation to increment. Processing errors for stale connections could result in
399+
// handling the same error root cause multiple times (e.g. a temporary network interrupt causing
400+
// all connections to the same server to return errors).
401+
if conn.Stale() {
402+
return driver.NoChange
403+
}
404+
393405
// Must hold the processErrorLock while updating the server description and clearing the pool.
394406
// Not holding the lock leads to possible out-of-order processing of pool.clear() and
395407
// pool.ready() calls from concurrent server description updates.
396408
s.processErrorLock.Lock()
397409
defer s.processErrorLock.Unlock()
398410

399-
// ignore stale error
400-
if conn.Stale() {
401-
return driver.NoChange
411+
// Get the wire version and service ID from the connection description because they will never
412+
// change for the lifetime of a connection and can possibly be different between connections to
413+
// the same server.
414+
connDesc := conn.Description()
415+
wireVersion := connDesc.WireVersion
416+
serviceID := connDesc.ServiceID
417+
418+
// Get the topology version from the Server description because the Server description is
419+
// updated by heartbeats and errors, so typically has a more up-to-date topology version.
420+
serverDesc := s.desc.Load().(description.Server)
421+
topologyVersion := serverDesc.TopologyVersion
422+
423+
// We don't currently update the Server topology version when we create new application
424+
// connections, so it's possible for a connection's topology version to be newer than the
425+
// Server's topology version. Pick the "newest" of the two topology versions.
426+
// Technically a nil topology version on a new database response should be considered a new
427+
// topology version and replace the Server's topology version. However, we don't know if the
428+
// connection's topology version is based on a new or old database response, so we ignore a nil
429+
// topology version on the connection for now.
430+
//
431+
// TODO(GODRIVER-2841): Remove this logic once we set the Server description when we create
432+
// TODO application connections because then the Server's topology version will always be the
433+
// TODO latest known.
434+
if tv := connDesc.TopologyVersion; tv != nil && topologyVersion.CompareToIncoming(tv) < 0 {
435+
topologyVersion = tv
402436
}
437+
403438
// Invalidate server description if not primary or node recovering error occurs.
404439
// These errors can be reported as a command error or a write concern error.
405-
desc := conn.Description()
406440
if cerr, ok := err.(driver.Error); ok && (cerr.NodeIsRecovering() || cerr.NotPrimary()) {
407-
// ignore stale error
408-
if desc.TopologyVersion.CompareToIncoming(cerr.TopologyVersion) >= 0 {
441+
// Ignore errors that came from when the database was on a previous topology version.
442+
if topologyVersion.CompareToIncoming(cerr.TopologyVersion) >= 0 {
409443
return driver.NoChange
410444
}
411445

@@ -415,16 +449,16 @@ func (s *Server) ProcessError(err error, conn driver.Connection) driver.ProcessE
415449

416450
res := driver.ServerMarkedUnknown
417451
// If the node is shutting down or is older than 4.2, we synchronously clear the pool
418-
if cerr.NodeIsShuttingDown() || desc.WireVersion == nil || desc.WireVersion.Max < 8 {
452+
if cerr.NodeIsShuttingDown() || wireVersion == nil || wireVersion.Max < wireVersion42 {
419453
res = driver.ConnectionPoolCleared
420-
s.pool.clear(err, desc.ServiceID)
454+
s.pool.clear(err, serviceID)
421455
}
422456

423457
return res
424458
}
425459
if wcerr, ok := getWriteConcernErrorForProcessing(err); ok {
426-
// ignore stale error
427-
if desc.TopologyVersion.CompareToIncoming(wcerr.TopologyVersion) >= 0 {
460+
// Ignore errors that came from when the database was on a previous topology version.
461+
if topologyVersion.CompareToIncoming(wcerr.TopologyVersion) >= 0 {
428462
return driver.NoChange
429463
}
430464

@@ -434,9 +468,9 @@ func (s *Server) ProcessError(err error, conn driver.Connection) driver.ProcessE
434468

435469
res := driver.ServerMarkedUnknown
436470
// If the node is shutting down or is older than 4.2, we synchronously clear the pool
437-
if wcerr.NodeIsShuttingDown() || desc.WireVersion == nil || desc.WireVersion.Max < 8 {
471+
if wcerr.NodeIsShuttingDown() || wireVersion == nil || wireVersion.Max < wireVersion42 {
438472
res = driver.ConnectionPoolCleared
439-
s.pool.clear(err, desc.ServiceID)
473+
s.pool.clear(err, serviceID)
440474
}
441475
return res
442476
}
@@ -458,7 +492,7 @@ func (s *Server) ProcessError(err error, conn driver.Connection) driver.ProcessE
458492
// monitoring check. The check is cancelled last to avoid a post-cancellation reconnect racing with
459493
// updateDescription.
460494
s.updateDescription(description.NewServerFromError(s.address, err, nil))
461-
s.pool.clear(err, desc.ServiceID)
495+
s.pool.clear(err, serviceID)
462496
s.cancelCheck()
463497
return driver.ConnectionPoolCleared
464498
}

0 commit comments

Comments
 (0)