Skip to content

RUBY-1813 Discard ServerSessions involved in network errors #2825

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jan 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/mongo/cluster.rb
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ def initialize(seeds, monitoring, options = Options::Redacted.new)
# @sdam_flow_lock covers just the sdam flow. Note it does not apply
# to @topology replacements which are done under @update_lock.
@sdam_flow_lock = Mutex.new
Session::SessionPool.create(self)
@session_pool = Session::SessionPool.new(self)

if seeds.empty? && load_balanced?
raise ArgumentError, 'Load-balanced clusters with no seeds are prohibited'
Expand Down
2 changes: 1 addition & 1 deletion lib/mongo/collection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -790,7 +790,7 @@ def inspect
def insert_one(document, opts = {})
QueryCache.clear_namespace(namespace)

client.send(:with_session, opts) do |session|
client.with_session(opts) do |session|
write_concern = if opts[:write_concern]
WriteConcern.get(opts[:write_concern])
else
Expand Down
70 changes: 43 additions & 27 deletions lib/mongo/operation/shared/executable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

require 'mongo/error'

module Mongo
module Operation

Expand All @@ -30,40 +32,42 @@ def do_execute(connection, context, options = {})
session&.materialize_if_needed
unpin_maybe(session, connection) do
add_error_labels(connection, context) do
add_server_diagnostics(connection) do
get_result(connection, context, options).tap do |result|
if session
if session.in_transaction? &&
connection.description.load_balancer?
then
if session.pinned_connection_global_id
unless session.pinned_connection_global_id == connection.global_id
raise(
Error::InternalDriverError,
"Expected operation to use connection #{session.pinned_connection_global_id} but it used #{connection.global_id}"
)
check_for_network_error do
add_server_diagnostics(connection) do
get_result(connection, context, options).tap do |result|
if session
if session.in_transaction? &&
connection.description.load_balancer?
then
if session.pinned_connection_global_id
unless session.pinned_connection_global_id == connection.global_id
raise(
Error::InternalDriverError,
"Expected operation to use connection #{session.pinned_connection_global_id} but it used #{connection.global_id}"
)
end
else
session.pin_to_connection(connection.global_id)
connection.pin
end
else
session.pin_to_connection(connection.global_id)
connection.pin
end
end

if session.snapshot? && !session.snapshot_timestamp
session.snapshot_timestamp = result.snapshot_timestamp
if session.snapshot? && !session.snapshot_timestamp
session.snapshot_timestamp = result.snapshot_timestamp
end
end
end

if result.has_cursor_id? &&
connection.description.load_balancer?
then
if result.cursor_id == 0
connection.unpin
else
connection.pin
if result.has_cursor_id? &&
connection.description.load_balancer?
then
if result.cursor_id == 0
connection.unpin
else
connection.pin
end
end
process_result(result, connection)
end
process_result(result, connection)
end
end
end
Expand Down Expand Up @@ -144,6 +148,18 @@ def process_result_for_sdam(result, connection)
connection.server.scan_semaphore.signal
end
end

NETWORK_ERRORS = [
Error::SocketError,
Error::SocketTimeoutError
].freeze

def check_for_network_error
yield
rescue *NETWORK_ERRORS
session&.dirty!
raise
end
end
end
end
48 changes: 23 additions & 25 deletions lib/mongo/operation/shared/response_handling.rb
Original file line number Diff line number Diff line change
Expand Up @@ -50,35 +50,33 @@ def validate_result(result, connection, context)
# the operation is performed.
# @param [ Mongo::Operation::Context ] context The operation context.
def add_error_labels(connection, context)
begin
yield
rescue Mongo::Error::SocketError => e
if context.in_transaction? && !context.committing_transaction?
e.add_label('TransientTransactionError')
end
if context.committing_transaction?
e.add_label('UnknownTransactionCommitResult')
end
yield
rescue Mongo::Error::SocketError => e
if context.in_transaction? && !context.committing_transaction?
e.add_label('TransientTransactionError')
end
if context.committing_transaction?
e.add_label('UnknownTransactionCommitResult')
end

maybe_add_retryable_write_error_label!(e, connection, context)

raise e
rescue Mongo::Error::SocketTimeoutError => e
maybe_add_retryable_write_error_label!(e, connection, context)
raise e
rescue Mongo::Error::OperationFailure => e
if context.committing_transaction?
if e.write_retryable? || e.wtimeout? || (e.write_concern_error? &&
!Session::UNLABELED_WRITE_CONCERN_CODES.include?(e.write_concern_error_code)
) || e.max_time_ms_expired?
e.add_label('UnknownTransactionCommitResult')
end
maybe_add_retryable_write_error_label!(e, connection, context)

raise e
rescue Mongo::Error::SocketTimeoutError => e
maybe_add_retryable_write_error_label!(e, connection, context)
raise e
rescue Mongo::Error::OperationFailure => e
if context.committing_transaction?
if e.write_retryable? || e.wtimeout? || (e.write_concern_error? &&
!Session::UNLABELED_WRITE_CONCERN_CODES.include?(e.write_concern_error_code)
) || e.max_time_ms_expired?
e.add_label('UnknownTransactionCommitResult')
end
end

maybe_add_retryable_write_error_label!(e, connection, context)
maybe_add_retryable_write_error_label!(e, connection, context)

raise e
end
raise e
end

# Unpins the session and/or the connection if the yielded to block
Expand Down
17 changes: 17 additions & 0 deletions lib/mongo/session.rb
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,23 @@ def snapshot?
# @since 2.5.0
attr_reader :operation_time

# Sets the dirty state to the given value for the underlying server
# session. If there is no server session, this does nothing.
#
# @param [ true | false ] mark whether to mark the server session as
# dirty, or not.
def dirty!(mark = true)
@server_session&.dirty!(mark)
end

# @return [ true | false | nil ] whether the underlying server session is
# dirty. If no server session exists for this session, returns nil.
#
# @api private
def dirty?
@server_session&.dirty?
end

# @return [ Hash ] The options for the transaction currently being executed
# on this session.
#
Expand Down
3 changes: 3 additions & 0 deletions lib/mongo/session/server_session.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

require 'mongo/session/server_session/dirtyable'

module Mongo

class Session
Expand All @@ -25,6 +27,7 @@ class Session
#
# @since 2.5.0
class ServerSession
include Dirtyable

# Regex for removing dashes from the UUID string.
#
Expand Down
52 changes: 52 additions & 0 deletions lib/mongo/session/server_session/dirtyable.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# frozen_string_literal: true

# Copyright (C) 2024 MongoDB Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

module Mongo
class Session
class ServerSession
# Functionality for manipulating and querying a session's
# "dirty" state, per the last paragraph at
# https://github.com/mongodb/specifications/blob/master/source/sessions/driver-sessions.rst#server-session-pool
#
# If a driver has a server session pool and a network error is
# encountered when executing any command with a ClientSession, the
# driver MUST mark the associated ServerSession as dirty. Dirty server
# sessions are discarded when returned to the server session pool. It is
# valid for a dirty session to be used for subsequent commands (e.g. an
# implicit retry attempt, a later command in a bulk write, or a later
# operation on an explicit session), however, it MUST remain dirty for
# the remainder of its lifetime regardless if later commands succeed.
#
# @api private
module Dirtyable
# Query whether the server session has been marked dirty or not.
#
# @return [ true | false ] the server session's dirty state
def dirty?
@dirty
end

# Mark the server session as dirty (the default) or clean.
#
# @param [ true | false ] mark whether the mark the server session
# dirty or not.
def dirty!(mark = true)
@dirty = mark
end
end
end
end
end
30 changes: 12 additions & 18 deletions lib/mongo/session/session_pool.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,6 @@ class Session
#
# @since 2.5.0
class SessionPool

# Create a SessionPool.
#
# @example
# SessionPool.create(cluster)
#
# @param [ Mongo::Cluster ] cluster The cluster that will be associated with this
# session pool.
#
# @since 2.5.0
def self.create(cluster)
pool = new(cluster)
cluster.instance_variable_set(:@session_pool, pool)
end

# Initialize a SessionPool.
#
# @example
Expand Down Expand Up @@ -105,9 +90,7 @@ def checkin(session)

@mutex.synchronize do
prune!
unless about_to_expire?(session)
@queue.unshift(session)
end
@queue.unshift(session) if return_to_queue?(session)
end
end

Expand Down Expand Up @@ -136,6 +119,17 @@ def end_sessions

private

# Query whether the given session is okay to return to the
# pool's queue.
#
# @param [ Session::ServerSession ] session the session to query
#
# @return [ true | false ] whether to return the session to the
# queue.
def return_to_queue?(session)
!session.dirty? && !about_to_expire?(session)
end

def about_to_expire?(session)
if session.nil?
raise ArgumentError, 'session cannot be nil'
Expand Down
17 changes: 1 addition & 16 deletions spec/mongo/session/session_pool_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,6 @@
end
end

describe '.create' do

let!(:pool) do
described_class.create(cluster)
end

it 'creates a session pool' do
expect(pool).to be_a(Mongo::Session::SessionPool)
end

it 'adds the pool as an instance variable on the cluster' do
expect(cluster.session_pool).to eq(pool)
end
end

describe '#initialize' do

let(:pool) do
Expand Down Expand Up @@ -181,7 +166,7 @@
describe '#end_sessions' do

let(:pool) do
described_class.create(client.cluster)
client.cluster.session_pool
end

let!(:session_a) do
Expand Down
8 changes: 3 additions & 5 deletions spec/runners/unified/support_operations.rb
Original file line number Diff line number Diff line change
Expand Up @@ -70,17 +70,15 @@ def assert_session_dirty(op)
consume_test_runner(op)
use_arguments(op) do |args|
session = entities.get(:session, args.use!('session'))
# https://jira.mongodb.org/browse/RUBY-1813
true
session.dirty? || raise(Error::ResultMismatch, 'expected session to be dirty')
end
end

def assert_session_not_dirty(op)
consume_test_runner(op)
use_arguments(op) do |args|
session = entities.get(:session, args.use!('session'))
# https://jira.mongodb.org/browse/RUBY-1813
true
session.dirty? && raise(Error::ResultMismatch, 'expected session to be not dirty')
end
end

Expand All @@ -92,7 +90,7 @@ def assert_same_lsid_on_last_two_commands(op, expected: true)
unless subscriber.started_events.length >= 2
raise Error::ResultMismatch, "Must have at least 2 events, have #{subscriber.started_events.length}"
end
lsids = subscriber.started_events[-2...-1].map do |cmd|
lsids = subscriber.started_events[-2..-1].map do |cmd|
cmd.command.fetch('lsid')
end
if expected
Expand Down
Loading