Skip to content

RUBY-3463 Fix cursor behaviour on load balanced #2893

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions gemfiles/standard.rb
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ def standard_dependencies
gem 'tilt'
# solargraph depends on rbs, which won't build on jruby for some reason
gem 'solargraph', platforms: :mri
gem 'ruby-lsp', platforms: :mri
end

gem 'libmongocrypt-helper', '~> 1.8.0' if ENV['FLE'] == 'helper'
Expand Down
7 changes: 6 additions & 1 deletion lib/mongo/cluster/reapers/cursor_reaper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,12 @@ def kill_cursors
server_api: server.options[:server_api],
connection_global_id: kill_spec.connection_global_id,
}
op.execute(server, context: Operation::Context.new(options: options))
if connection = kill_spec.connection
op.execute_with_connection(connection, context: Operation::Context.new(options: options))
connection.connection_pool.check_in(connection)
else
op.execute(server, context: Operation::Context.new(options: options))
end

if session = kill_spec.session
if session.implicit?
Expand Down
14 changes: 13 additions & 1 deletion lib/mongo/collection/view/aggregation.rb
Original file line number Diff line number Diff line change
Expand Up @@ -118,14 +118,26 @@ def effective_read_preference(connection)
end

def send_initial_query(server, context)
server.with_connection do |connection|
if server.load_balancer?
# Connection will be checked in when cursor is drained.
connection = server.pool.check_out(context: context)
initial_query_op(
context.session,
effective_read_preference(connection)
).execute_with_connection(
connection,
context: context
)
else
server.with_connection do |connection|
initial_query_op(
context.session,
effective_read_preference(connection)
).execute_with_connection(
connection,
context: context
)
end
end
end
end
Expand Down
9 changes: 8 additions & 1 deletion lib/mongo/collection/view/iterable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,14 @@ def initial_query_op(session)
end

def send_initial_query(server, context)
initial_query_op(context.session).execute(server, context: context)
operation = initial_query_op(context.session)
if server.load_balancer?
# Connection will be checked in when cursor is drained.
connection = server.pool.check_out(context: context)
operation.execute_with_connection(connection, context: context)
else
operation.execute(server, context: context)
end
end

def use_query_cache?
Expand Down
24 changes: 20 additions & 4 deletions lib/mongo/collection/view/map_reduce.rb
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,15 @@ def each
session = client.get_session(@options)
server = cluster.next_primary(nil, session)
context = Operation::Context.new(client: client, session: session, operation_timeouts: view.operation_timeouts)
result = send_initial_query(server, context)
result = send_fetch_query(server, session) unless inline?
if server.load_balancer?
# Connection will be checked in when cursor is drained.
connection = server.pool.check_out(context: context)
result = send_initial_query_with_connection(connection, context.session, context: context)
result = send_fetch_query_with_connection(connection, session) unless inline?
else
result = send_initial_query(server, context)
result = send_fetch_query(server, session) unless inline?
end
@cursor = Cursor.new(view, result, server, session: session)
if block_given?
@cursor.each do |doc|
Expand Down Expand Up @@ -306,7 +313,7 @@ def find_command_spec(session)
Builder::MapReduce.new(map_function, reduce_function, view, options.merge(session: session)).command_specification
end

def fetch_query_op(server, session)
def fetch_query_op(session)
spec = {
coll_name: out_collection_name,
db_name: out_database_name,
Expand All @@ -320,7 +327,16 @@ def fetch_query_op(server, session)
end

def send_fetch_query(server, session)
fetch_query_op(server, session).execute(server, context: Operation::Context.new(client: client, session: session))
fetch_query_op(session).execute(server, context: Operation::Context.new(client: client, session: session))
end

def send_fetch_query_with_connection(connection, session)
fetch_query_op(
session
).execute_with_connection(
connection,
context: Operation::Context.new(client: client, session: session)
)
end
end
end
Expand Down
8 changes: 7 additions & 1 deletion lib/mongo/collection/view/readable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -736,7 +736,13 @@ def parallel_scan(cursor_count, options = {})
session: session,
connection_global_id: result.connection_global_id,
)
result = op.execute(server, context: context)
result = if server.load_balancer?
# Connection will be checked in when cursor is drained.
connection = server.pool.check_out(context: context)
op.execute_with_connection(connection, context: context)
else
op.execute(server, context: context)
end
Cursor.new(self, result, server, session: session)
end
end
Expand Down
48 changes: 43 additions & 5 deletions lib/mongo/cursor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,19 @@ def initialize(view, result, server, options = {})
@context = @options[:context]&.with(connection_global_id: connection_global_id_for_context) || fresh_context
@explicitly_closed = false
@lock = Mutex.new
unless closed?
if server.load_balancer?
# We need the connection in the cursor only in load balanced topology;
# we do not need an additional reference to it otherwise.
@connection = @initial_result.connection
end
if closed?
check_in_connection
else
register
ObjectSpace.define_finalizer(self, self.class.finalize(kill_spec(@connection_global_id),
cluster))
ObjectSpace.define_finalizer(
self,
self.class.finalize(kill_spec(@connection_global_id), cluster)
)
end
end

Expand All @@ -104,6 +113,9 @@ def initialize(view, result, server, options = {})
# @api private
attr_reader :initial_result

# @api private
attr_reader :connection

# Finalize the cursor for garbage collection. Schedules this cursor to be included
# in a killCursors operation executed by the Cluster's CursorReaper.
#
Expand Down Expand Up @@ -315,6 +327,7 @@ def close(opts = {})
@lock.synchronize do
@explicitly_closed = true
end
check_in_connection
end

# Get the parsed collection name.
Expand Down Expand Up @@ -395,6 +408,7 @@ def kill_spec(connection_global_id)
connection_global_id: connection_global_id,
server_address: server.address,
session: @session,
connection: @connection
)
end

Expand Down Expand Up @@ -464,7 +478,10 @@ def process(result)
# the @cursor_id may be zero (all results fit in the first batch).
# Thus we need to check both @cursor_id and the cursor_id of the result
# prior to calling unregister here.
unregister if !closed? && result.cursor_id == 0
if !closed? && result.cursor_id == 0
unregister
check_in_connection
end
@cursor_id = set_cursor_id(result)

if result.respond_to?(:post_batch_resume_token)
Expand Down Expand Up @@ -496,7 +513,12 @@ def unregister
end

def execute_operation(op, context: nil)
op.execute(@server, context: context || possibly_refreshed_context)
op_context = context || possibly_refreshed_context
if @connection.nil?
op.execute(@server, context: op_context)
else
op.execute_with_connection(@connection, context: op_context)
end
end

# Considers the timeout mode and will either return the cursor's
Expand Down Expand Up @@ -545,6 +567,22 @@ def connection_global_id_for_context
@connection_global_id
end
end

# Returns the connection that was used to create the cursor back to the
# corresponding connection pool.
#
# In a load balanced topology cursors must use the same connection for the
# initial and all subsequent operations. Therefore, the connection is not
# checked into the pool after the initial operation is completed, but
# only when the cursor is drained.
def check_in_connection
# Connection nil means the connection has been already checked in.
return if @connection.nil?
return unless @connection.server.load_balancer?

@connection.connection_pool.check_in(@connection)
@connection = nil
end
end
end

Expand Down
7 changes: 5 additions & 2 deletions lib/mongo/cursor/kill_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,22 +31,25 @@ def initialize(
db_name:,
connection_global_id:,
server_address:,
session:
session:,
connection: nil
)
@cursor_id = cursor_id
@coll_name = coll_name
@db_name = db_name
@connection_global_id = connection_global_id
@server_address = server_address
@session = session
@connection = connection
end

attr_reader :cursor_id,
:coll_name,
:db_name,
:connection_global_id,
:server_address,
:session
:session,
:connection

def ==(other)
cursor_id == other.cursor_id &&
Expand Down
19 changes: 14 additions & 5 deletions lib/mongo/database/view.rb
Original file line number Diff line number Diff line change
Expand Up @@ -288,11 +288,20 @@ def send_initial_query(server, session, context, options = {})
if opts.key?(:deserialize_as_bson)
execution_opts[:deserialize_as_bson] = opts.delete(:deserialize_as_bson)
end
initial_query_op(session, opts).execute(
server,
context: context,
options: execution_opts
)
if server.load_balancer?
connection = server.pool.check_out(context: context)
initial_query_op(session, opts).execute_with_connection(
connection,
context: context,
options: execution_opts
)
else
initial_query_op(session, opts).execute(
server,
context: context,
options: execution_opts
)
end
end
end
end
Expand Down
7 changes: 6 additions & 1 deletion lib/mongo/index/view.rb
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,12 @@ def normalize_models(models, server)
end

def send_initial_query(server, session, context)
initial_query_op(session).execute(server, context: context)
if server.load_balancer?
connection = server.pool.check_out(context: context)
initial_query_op(session).execute_with_connection(connection, context: context)
else
initial_query_op(session).execute(server, context: context)
end
end
end
end
Expand Down
5 changes: 4 additions & 1 deletion lib/mongo/operation/result.rb
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ class Result
# when this result was produced.
#
# @api private
def initialize(replies, connection_description = nil, connection_global_id = nil, context: nil)
def initialize(replies, connection_description = nil, connection_global_id = nil, context: nil, connection: nil)
@context = context

if replies
Expand All @@ -122,6 +122,7 @@ def initialize(replies, connection_description = nil, connection_global_id = nil
@replies = [ reply ]
@connection_description = connection_description
@connection_global_id = connection_global_id
@connection = connection
end
end

Expand All @@ -148,6 +149,8 @@ def initialize(replies, connection_description = nil, connection_global_id = nil
# @api private
attr_reader :context

attr_reader :connection

# @api private
def_delegators :parser,
:not_master?, :node_recovering?, :node_shutting_down?
Expand Down
2 changes: 1 addition & 1 deletion lib/mongo/operation/shared/executable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ def result_class
end

def get_result(connection, context, options = {})
result_class.new(*dispatch_message(connection, context, options), context: context)
result_class.new(*dispatch_message(connection, context, options), context: context, connection: connection)
end

# Returns a Protocol::Message or nil as reply.
Expand Down
Loading
Loading