From a41cf00109fac98fedd9ca080e1006cb1deb5366 Mon Sep 17 00:00:00 2001 From: Dmitry Rybakov Date: Mon, 12 Aug 2024 10:52:29 +0200 Subject: [PATCH 1/5] RUBY-3463 Fix cursors on load balanced --- lib/mongo/collection/view/aggregation.rb | 14 ++++++++- lib/mongo/collection/view/iterable.rb | 9 +++++- lib/mongo/collection/view/map_reduce.rb | 26 ++++++++++++---- lib/mongo/collection/view/readable.rb | 8 ++++- lib/mongo/cursor.rb | 38 ++++++++++++++++++++++-- lib/mongo/database/view.rb | 19 ++++++++---- lib/mongo/index/view.rb | 7 ++++- lib/mongo/operation/result.rb | 5 +++- lib/mongo/operation/shared/executable.rb | 2 +- 9 files changed, 109 insertions(+), 19 deletions(-) diff --git a/lib/mongo/collection/view/aggregation.rb b/lib/mongo/collection/view/aggregation.rb index 9b190a56b0..154b50488a 100644 --- a/lib/mongo/collection/view/aggregation.rb +++ b/lib/mongo/collection/view/aggregation.rb @@ -118,7 +118,9 @@ def effective_read_preference(connection) end def send_initial_query(server, context) - server.with_connection do |connection| + if server.load_balancer? + # Connection will be checked in when cursor is drained. + connection = server.pool.check_out(context: context) initial_query_op( context.session, effective_read_preference(connection) @@ -126,6 +128,16 @@ def send_initial_query(server, context) connection, context: context ) + else + server.with_connection do |connection| + initial_query_op( + context.session, + effective_read_preference(connection) + ).execute_with_connection( + connection, + context: context + ) + end end end end diff --git a/lib/mongo/collection/view/iterable.rb b/lib/mongo/collection/view/iterable.rb index c35a23d559..99133c5e9f 100644 --- a/lib/mongo/collection/view/iterable.rb +++ b/lib/mongo/collection/view/iterable.rb @@ -168,7 +168,14 @@ def initial_query_op(session) end def send_initial_query(server, context) - initial_query_op(context.session).execute(server, context: context) + operation = initial_query_op(context.session) + if server.load_balancer? + # Connection will be checked in when cursor is drained. + connection = server.pool.check_out(context: context) + operation.execute_with_connection(connection, context: context) + else + operation.execute(server, context: context) + end end def use_query_cache? diff --git a/lib/mongo/collection/view/map_reduce.rb b/lib/mongo/collection/view/map_reduce.rb index 386227708c..ab0b761ba6 100644 --- a/lib/mongo/collection/view/map_reduce.rb +++ b/lib/mongo/collection/view/map_reduce.rb @@ -73,8 +73,15 @@ def each session = client.get_session(@options) server = cluster.next_primary(nil, session) context = Operation::Context.new(client: client, session: session, operation_timeouts: view.operation_timeouts) - result = send_initial_query(server, context) - result = send_fetch_query(server, session) unless inline? + if server.load_balancer? + # Connection will be checked in when cursor is drained. + connection = server.pool.check_out(context: context) + result = send_initial_query_with_connection(connection, context.session, context: context) + result = send_fetch_query_with_connection(connection, session) unless inline? + else + result = send_initial_query(server, context) + result = send_fetch_query(server, session) unless inline? + end @cursor = Cursor.new(view, result, server, session: session) if block_given? @cursor.each do |doc| @@ -306,7 +313,7 @@ def find_command_spec(session) Builder::MapReduce.new(map_function, reduce_function, view, options.merge(session: session)).command_specification end - def fetch_query_op(server, session) + def fetch_query_op(session) spec = { coll_name: out_collection_name, db_name: out_database_name, @@ -319,8 +326,17 @@ def fetch_query_op(server, session) Operation::Find.new(spec) end - def send_fetch_query(server, session) - fetch_query_op(server, session).execute(server, context: Operation::Context.new(client: client, session: session)) + def send_fetch_query(session) + fetch_query_op(session).execute(server, context: Operation::Context.new(client: client, session: session)) + end + + def send_fetch_query_with_connection(connection, session) + fetch_query_op( + session + ).execute_with_connection( + connection, + context: Operation::Context.new(client: client, session: session) + ) end end end diff --git a/lib/mongo/collection/view/readable.rb b/lib/mongo/collection/view/readable.rb index 58f7a380ab..05fcc78df7 100644 --- a/lib/mongo/collection/view/readable.rb +++ b/lib/mongo/collection/view/readable.rb @@ -736,7 +736,13 @@ def parallel_scan(cursor_count, options = {}) session: session, connection_global_id: result.connection_global_id, ) - result = op.execute(server, context: context) + result = if server.load_balancer? + # Connection will be checked in when cursor is drained. + connection = server.pool.check_out(context: context) + op.execute_with_connection(connection, context: context) + else + op.execute(server, context: context) + end Cursor.new(self, result, server, session: session) end end diff --git a/lib/mongo/cursor.rb b/lib/mongo/cursor.rb index 618d30d032..e0456e7277 100644 --- a/lib/mongo/cursor.rb +++ b/lib/mongo/cursor.rb @@ -91,7 +91,14 @@ def initialize(view, result, server, options = {}) @context = @options[:context]&.with(connection_global_id: connection_global_id_for_context) || fresh_context @explicitly_closed = false @lock = Mutex.new - unless closed? + if server.load_balancer? + # We need the connection in the cursor only in load balanced topology; + # we do not need an additional reference to it otherwise. + @connection = @initial_result.connection + end + if closed? + check_in_connection + else register ObjectSpace.define_finalizer(self, self.class.finalize(kill_spec(@connection_global_id), cluster)) @@ -315,6 +322,7 @@ def close(opts = {}) @lock.synchronize do @explicitly_closed = true end + check_in_connection end # Get the parsed collection name. @@ -464,7 +472,10 @@ def process(result) # the @cursor_id may be zero (all results fit in the first batch). # Thus we need to check both @cursor_id and the cursor_id of the result # prior to calling unregister here. - unregister if !closed? && result.cursor_id == 0 + if !closed? && result.cursor_id == 0 + unregister + check_in_connection + end @cursor_id = set_cursor_id(result) if result.respond_to?(:post_batch_resume_token) @@ -496,7 +507,12 @@ def unregister end def execute_operation(op, context: nil) - op.execute(@server, context: context || possibly_refreshed_context) + op_context = context || possibly_refreshed_context + if @connection.nil? + op.execute(@server, context: op_context) + else + op.execute_with_connection(@connection, context: op_context) + end end # Considers the timeout mode and will either return the cursor's @@ -545,6 +561,22 @@ def connection_global_id_for_context @connection_global_id end end + + # Returns the connection that was used to create the cursor back to the + # corresponding connection pool. + # + # In a load balanced topology cursors must use the same connection for the + # initial and all subsequent operations. Therefore, the connection is not + # checked into the pool after the initial operation is completed, but + # only when the cursor is drained. + def check_in_connection + # Connection nil means the connection has been already checked in. + return if @connection.nil? + return unless @connection.server.load_balancer? + + @connection.connection_pool.check_in(@connection) + @connection = nil + end end end diff --git a/lib/mongo/database/view.rb b/lib/mongo/database/view.rb index 883e041d47..3c0bcaaf89 100644 --- a/lib/mongo/database/view.rb +++ b/lib/mongo/database/view.rb @@ -288,11 +288,20 @@ def send_initial_query(server, session, context, options = {}) if opts.key?(:deserialize_as_bson) execution_opts[:deserialize_as_bson] = opts.delete(:deserialize_as_bson) end - initial_query_op(session, opts).execute( - server, - context: context, - options: execution_opts - ) + if server.load_balancer? + connection = server.pool.check_out(context: context) + initial_query_op(session, opts).execute_with_connection( + connection, + context: context, + options: execution_opts + ) + else + initial_query_op(session, opts).execute( + server, + context: context, + options: execution_opts + ) + end end end end diff --git a/lib/mongo/index/view.rb b/lib/mongo/index/view.rb index f915de360e..4e8c41b742 100644 --- a/lib/mongo/index/view.rb +++ b/lib/mongo/index/view.rb @@ -404,7 +404,12 @@ def normalize_models(models, server) end def send_initial_query(server, session, context) - initial_query_op(session).execute(server, context: context) + if server.load_balancer? + connection = server.pool.check_out(context: context) + initial_query_op(session).execute_with_connection(connection, context: context) + else + initial_query_op(session).execute(server, context: context) + end end end end diff --git a/lib/mongo/operation/result.rb b/lib/mongo/operation/result.rb index 9508432ccb..42de5d5720 100644 --- a/lib/mongo/operation/result.rb +++ b/lib/mongo/operation/result.rb @@ -104,7 +104,7 @@ class Result # when this result was produced. # # @api private - def initialize(replies, connection_description = nil, connection_global_id = nil, context: nil) + def initialize(replies, connection_description = nil, connection_global_id = nil, context: nil, connection: nil) @context = context if replies @@ -122,6 +122,7 @@ def initialize(replies, connection_description = nil, connection_global_id = nil @replies = [ reply ] @connection_description = connection_description @connection_global_id = connection_global_id + @connection = connection end end @@ -148,6 +149,8 @@ def initialize(replies, connection_description = nil, connection_global_id = nil # @api private attr_reader :context + attr_reader :connection + # @api private def_delegators :parser, :not_master?, :node_recovering?, :node_shutting_down? diff --git a/lib/mongo/operation/shared/executable.rb b/lib/mongo/operation/shared/executable.rb index 19e98013de..041e4d1e5b 100644 --- a/lib/mongo/operation/shared/executable.rb +++ b/lib/mongo/operation/shared/executable.rb @@ -104,7 +104,7 @@ def result_class end def get_result(connection, context, options = {}) - result_class.new(*dispatch_message(connection, context, options), context: context) + result_class.new(*dispatch_message(connection, context, options), context: context, connection: connection) end # Returns a Protocol::Message or nil as reply. From cc834f5678a0f1ad924aba737c85a93d80e6d125 Mon Sep 17 00:00:00 2001 From: Dmitry Rybakov Date: Mon, 12 Aug 2024 11:23:32 +0200 Subject: [PATCH 2/5] Additional fixes --- lib/mongo/collection/view/map_reduce.rb | 2 +- spec/mongo/query_cache_spec.rb | 459 ++++++++++++------------ 2 files changed, 236 insertions(+), 225 deletions(-) diff --git a/lib/mongo/collection/view/map_reduce.rb b/lib/mongo/collection/view/map_reduce.rb index ab0b761ba6..d484f65c6e 100644 --- a/lib/mongo/collection/view/map_reduce.rb +++ b/lib/mongo/collection/view/map_reduce.rb @@ -80,7 +80,7 @@ def each result = send_fetch_query_with_connection(connection, session) unless inline? else result = send_initial_query(server, context) - result = send_fetch_query(server, session) unless inline? + result = send_fetch_query(session) unless inline? end @cursor = Cursor.new(view, result, server, session: session) if block_given? diff --git a/spec/mongo/query_cache_spec.rb b/spec/mongo/query_cache_spec.rb index a39e259676..f5dd875307 100644 --- a/spec/mongo/query_cache_spec.rb +++ b/spec/mongo/query_cache_spec.rb @@ -168,257 +168,268 @@ allow(view).to receive(:limit) { nil } end - context 'when there is no entry in the cache' do - it 'returns nil' do - expect(Mongo::QueryCache.get(**options)).to be_nil - end - end - - context 'when there is an entry in the cache' do - before do - Mongo::QueryCache.set(caching_cursor, **caching_cursor_options) - end - - context 'when that entry has no limit' do - let(:caching_cursor_options) do - { - namespace: 'db.coll', - selector: { field: 'value' }, - } - end - - let(:query_options) do - caching_cursor_options.merge(limit: limit) - end - - context 'when the query has a limit' do - let(:limit) { 5 } - - it 'returns the caching cursor' do - expect(Mongo::QueryCache.get(**query_options)).to eq(caching_cursor) - end - end - - context 'when the query has a limit but negative' do - let(:limit) { -5 } - - it 'returns the caching cursor' do - expect(Mongo::QueryCache.get(**query_options)).to eq(caching_cursor) - end - end - - context 'when the query has no limit' do - let(:limit) { nil } - - it 'returns the caching cursor' do - expect(Mongo::QueryCache.get(**query_options)).to eq(caching_cursor) - end - end - - context 'when the query has a 0 limit' do - let(:limit) { 0 } - - it 'returns the caching cursor' do - expect(Mongo::QueryCache.get(**query_options)).to eq(caching_cursor) - end - end - end - - context 'when that entry has a 0 limit' do - let(:caching_cursor_options) do - { - namespace: 'db.coll', - selector: { field: 'value' }, - limit: 0, - } - end - - let(:query_options) do - caching_cursor_options.merge(limit: limit) - end - - before do - allow(view).to receive(:limit) { 0 } - end - - context 'when the query has a limit' do - let(:limit) { 5 } - - it 'returns the caching cursor' do - expect(Mongo::QueryCache.get(**query_options)).to eq(caching_cursor) - end - end - - context 'when the query has a limit but negative' do - let(:limit) { -5 } - - it 'returns the caching cursor' do - expect(Mongo::QueryCache.get(**query_options)).to eq(caching_cursor) - end - end - - - context 'when the query has no limit' do - let(:limit) { nil } - - it 'returns the caching cursor' do - expect(Mongo::QueryCache.get(**query_options)).to eq(caching_cursor) - end - end - - context 'when the query has a 0 limit' do - let(:limit) { 0 } - - it 'returns the caching cursor' do - expect(Mongo::QueryCache.get(**query_options)).to eq(caching_cursor) - end - end - end - - context 'when that entry has a limit' do - let(:caching_cursor_options) do - { - namespace: 'db.coll', - selector: { field: 'value' }, - limit: 5, - } - end - - let(:query_options) do - caching_cursor_options.merge(limit: limit) - end - + [true, false].each do |load_balancer| + context "when load_balancer is #{load_balancer}" do before do - allow(view).to receive(:limit) { 5 } - end - - context 'and the new query has a smaller limit' do - let(:limit) { 4 } - - it 'returns the caching cursor' do - expect(Mongo::QueryCache.get(**query_options)).to eq(caching_cursor) - end - end - - context 'and the new query has a smaller limit but negative' do - let(:limit) { -4 } - - it 'returns the caching cursor' do - expect(Mongo::QueryCache.get(**query_options)).to eq(caching_cursor) + allow(server).to receive(:load_balancer?) { load_balancer } + if load_balancer + allow(result).to receive(:connection) { nil } end end - context 'and the new query has a larger limit' do - let(:limit) { 6 } - - it 'returns nil' do - expect(Mongo::QueryCache.get(**query_options)).to be_nil - end - end - - context 'and the new query has a larger limit but negative' do - let(:limit) { -6 } - + context 'when there is no entry in the cache' do it 'returns nil' do - expect(Mongo::QueryCache.get(**query_options)).to be_nil + expect(Mongo::QueryCache.get(**options)).to be_nil end end - context 'and the new query has the same limit' do - let(:limit) { 5 } - - it 'returns the caching cursor' do - expect(Mongo::QueryCache.get(**query_options)).to eq(caching_cursor) - end - end - - context 'and the new query has the same limit but negative' do - let(:limit) { -5 } - - it 'returns the caching cursor' do - expect(Mongo::QueryCache.get(**query_options)).to eq(caching_cursor) + context 'when there is an entry in the cache' do + before do + Mongo::QueryCache.set(caching_cursor, **caching_cursor_options) end - end - context 'and the new query has no limit' do - let(:limit) { nil } - - it 'returns nil' do - expect(Mongo::QueryCache.get(**query_options)).to be_nil + context 'when that entry has no limit' do + let(:caching_cursor_options) do + { + namespace: 'db.coll', + selector: { field: 'value' }, + } + end + + let(:query_options) do + caching_cursor_options.merge(limit: limit) + end + + context 'when the query has a limit' do + let(:limit) { 5 } + + it 'returns the caching cursor' do + expect(Mongo::QueryCache.get(**query_options)).to eq(caching_cursor) + end + end + + context 'when the query has a limit but negative' do + let(:limit) { -5 } + + it 'returns the caching cursor' do + expect(Mongo::QueryCache.get(**query_options)).to eq(caching_cursor) + end + end + + context 'when the query has no limit' do + let(:limit) { nil } + + it 'returns the caching cursor' do + expect(Mongo::QueryCache.get(**query_options)).to eq(caching_cursor) + end + end + + context 'when the query has a 0 limit' do + let(:limit) { 0 } + + it 'returns the caching cursor' do + expect(Mongo::QueryCache.get(**query_options)).to eq(caching_cursor) + end + end end - end - context 'and the new query has a 0 limit' do - let(:limit) { 0 } + context 'when that entry has a 0 limit' do + let(:caching_cursor_options) do + { + namespace: 'db.coll', + selector: { field: 'value' }, + limit: 0, + } + end - it 'returns nil' do - expect(Mongo::QueryCache.get(**query_options)).to be_nil - end - end - end + let(:query_options) do + caching_cursor_options.merge(limit: limit) + end - context 'when that entry has a negative limit' do - let(:caching_cursor_options) do - { - namespace: 'db.coll', - selector: { field: 'value' }, - limit: -5, - } - end + before do + allow(view).to receive(:limit) { 0 } + end - let(:query_options) do - caching_cursor_options.merge(limit: limit) - end + context 'when the query has a limit' do + let(:limit) { 5 } - before do - allow(view).to receive(:limit) { -5 } - end + it 'returns the caching cursor' do + expect(Mongo::QueryCache.get(**query_options)).to eq(caching_cursor) + end + end - context 'and the new query has a smaller limit' do - let(:limit) { 4 } + context 'when the query has a limit but negative' do + let(:limit) { -5 } - it 'returns the caching cursor' do - expect(Mongo::QueryCache.get(**query_options)).to eq(caching_cursor) - end - end + it 'returns the caching cursor' do + expect(Mongo::QueryCache.get(**query_options)).to eq(caching_cursor) + end + end - context 'and the new query has a larger limit' do - let(:limit) { 6 } - it 'returns nil' do - expect(Mongo::QueryCache.get(**query_options)).to be_nil - end - end + context 'when the query has no limit' do + let(:limit) { nil } - context 'and the new query has the same negative limit' do - let(:limit) { -5 } - - it 'returns the caching cursor' do - expect(Mongo::QueryCache.get(**query_options)).to eq(caching_cursor) - end - end + it 'returns the caching cursor' do + expect(Mongo::QueryCache.get(**query_options)).to eq(caching_cursor) + end + end - context 'and the new query has the same positive limit' do - let(:limit) { 5 } + context 'when the query has a 0 limit' do + let(:limit) { 0 } - it 'returns the caching cursor' do - expect(Mongo::QueryCache.get(**query_options)).to eq(caching_cursor) + it 'returns the caching cursor' do + expect(Mongo::QueryCache.get(**query_options)).to eq(caching_cursor) + end + end end - end - - context 'and the new query has no limit' do - let(:limit) { nil } - it 'returns nil' do - expect(Mongo::QueryCache.get(**query_options)).to be_nil + context 'when that entry has a limit' do + let(:caching_cursor_options) do + { + namespace: 'db.coll', + selector: { field: 'value' }, + limit: 5, + } + end + + let(:query_options) do + caching_cursor_options.merge(limit: limit) + end + + before do + allow(view).to receive(:limit) { 5 } + end + + context 'and the new query has a smaller limit' do + let(:limit) { 4 } + + it 'returns the caching cursor' do + expect(Mongo::QueryCache.get(**query_options)).to eq(caching_cursor) + end + end + + context 'and the new query has a smaller limit but negative' do + let(:limit) { -4 } + + it 'returns the caching cursor' do + expect(Mongo::QueryCache.get(**query_options)).to eq(caching_cursor) + end + end + + context 'and the new query has a larger limit' do + let(:limit) { 6 } + + it 'returns nil' do + expect(Mongo::QueryCache.get(**query_options)).to be_nil + end + end + + context 'and the new query has a larger limit but negative' do + let(:limit) { -6 } + + it 'returns nil' do + expect(Mongo::QueryCache.get(**query_options)).to be_nil + end + end + + context 'and the new query has the same limit' do + let(:limit) { 5 } + + it 'returns the caching cursor' do + expect(Mongo::QueryCache.get(**query_options)).to eq(caching_cursor) + end + end + + context 'and the new query has the same limit but negative' do + let(:limit) { -5 } + + it 'returns the caching cursor' do + expect(Mongo::QueryCache.get(**query_options)).to eq(caching_cursor) + end + end + + context 'and the new query has no limit' do + let(:limit) { nil } + + it 'returns nil' do + expect(Mongo::QueryCache.get(**query_options)).to be_nil + end + end + + context 'and the new query has a 0 limit' do + let(:limit) { 0 } + + it 'returns nil' do + expect(Mongo::QueryCache.get(**query_options)).to be_nil + end + end end - end - - context 'and the new query has a 0 limit' do - let(:limit) { 0 } - it 'returns nil' do - expect(Mongo::QueryCache.get(**query_options)).to be_nil + context 'when that entry has a negative limit' do + let(:caching_cursor_options) do + { + namespace: 'db.coll', + selector: { field: 'value' }, + limit: -5, + } + end + + let(:query_options) do + caching_cursor_options.merge(limit: limit) + end + + before do + allow(view).to receive(:limit) { -5 } + end + + context 'and the new query has a smaller limit' do + let(:limit) { 4 } + + it 'returns the caching cursor' do + expect(Mongo::QueryCache.get(**query_options)).to eq(caching_cursor) + end + end + + context 'and the new query has a larger limit' do + let(:limit) { 6 } + + it 'returns nil' do + expect(Mongo::QueryCache.get(**query_options)).to be_nil + end + end + + context 'and the new query has the same negative limit' do + let(:limit) { -5 } + + it 'returns the caching cursor' do + expect(Mongo::QueryCache.get(**query_options)).to eq(caching_cursor) + end + end + + context 'and the new query has the same positive limit' do + let(:limit) { 5 } + + it 'returns the caching cursor' do + expect(Mongo::QueryCache.get(**query_options)).to eq(caching_cursor) + end + end + + context 'and the new query has no limit' do + let(:limit) { nil } + + it 'returns nil' do + expect(Mongo::QueryCache.get(**query_options)).to be_nil + end + end + + context 'and the new query has a 0 limit' do + let(:limit) { 0 } + + it 'returns nil' do + expect(Mongo::QueryCache.get(**query_options)).to be_nil + end + end end end end From 2526fb6ab7e541d16307e72d8cd451797b47f5ae Mon Sep 17 00:00:00 2001 From: Dmitry Rybakov Date: Tue, 13 Aug 2024 18:20:11 +0200 Subject: [PATCH 3/5] Update specs for lb --- lib/mongo/collection/view/map_reduce.rb | 4 +- lib/mongo/cursor.rb | 3 + spec/integration/cursor_pinning_spec.rb | 75 +++++-------------------- spec/mongo/cursor_spec.rb | 23 ++++++-- 4 files changed, 38 insertions(+), 67 deletions(-) diff --git a/lib/mongo/collection/view/map_reduce.rb b/lib/mongo/collection/view/map_reduce.rb index d484f65c6e..43ef179421 100644 --- a/lib/mongo/collection/view/map_reduce.rb +++ b/lib/mongo/collection/view/map_reduce.rb @@ -80,7 +80,7 @@ def each result = send_fetch_query_with_connection(connection, session) unless inline? else result = send_initial_query(server, context) - result = send_fetch_query(session) unless inline? + result = send_fetch_query(server, session) unless inline? end @cursor = Cursor.new(view, result, server, session: session) if block_given? @@ -326,7 +326,7 @@ def fetch_query_op(session) Operation::Find.new(spec) end - def send_fetch_query(session) + def send_fetch_query(server, session) fetch_query_op(session).execute(server, context: Operation::Context.new(client: client, session: session)) end diff --git a/lib/mongo/cursor.rb b/lib/mongo/cursor.rb index e0456e7277..869a5ca2b6 100644 --- a/lib/mongo/cursor.rb +++ b/lib/mongo/cursor.rb @@ -111,6 +111,9 @@ def initialize(view, result, server, options = {}) # @api private attr_reader :initial_result + # @api private + attr_reader :connection + # Finalize the cursor for garbage collection. Schedules this cursor to be included # in a killCursors operation executed by the Cluster's CursorReaper. # diff --git a/spec/integration/cursor_pinning_spec.rb b/spec/integration/cursor_pinning_spec.rb index b20c11d2a8..a5ade03814 100644 --- a/spec/integration/cursor_pinning_spec.rb +++ b/spec/integration/cursor_pinning_spec.rb @@ -52,73 +52,28 @@ context 'lb' do require_topology :load_balanced - # In load-balanced topology, we cannot create new connections to a - # particular service. + # In load-balanced topology, a cursor retains the connection used to create + # it until the cursor is closed. - context 'when no connection is available' do + context 'when connection is available' do + require_multi_mongos - it 'raises ConnectionCheckOutTimeout' do - server.pool.size.should == 0 + let(:client) { authorized_client.with(max_pool_size: 2) } + it 'does not return connection to the pool if cursor not drained' do + expect(server.pool).not_to receive(:check_in) enum = collection.find({}, batch_size: 1).to_enum - # Still zero because we haven't iterated - server.pool.size.should == 0 - + # Get the first element only; cursor is not drained, so there should + # be no check_in of the connection. enum.next - server.pool.size.should == 1 - - # Grab the connection that was used - server.with_connection do - # This requires a new connection, but we cannot make one. - lambda do - enum.next - end.should raise_error(Mongo::Error::ConnectionCheckOutTimeout) - - server.pool.size.should == 1 - end end - end - - context 'when connection is available' do - require_multi_mongos - - let(:client) { authorized_client.with(max_pool_size: 4) } - - it 'uses the available connection' do - server.pool.size.should == 0 - - # Create 4 connections. - - enums = [] - connections = [] - connection_ids = [] - - 4.times do - view = collection.find({}, batch_size: 1) - enum = view.to_enum - - enum.next - - enums << enum - connection_ids << view.cursor.initial_result.connection_global_id - connections << server.pool.check_out - end - - connection_ids.uniq.length.should be > 1 - - server.pool.size.should == 4 - - connections.each do |c| - server.pool.check_in(c) - end - # At this point, in theory, all connections are equally likely to - # be chosen, but we have cursors referencing more than one - # distinct service. - # Iterate each cursor to ensure they all continue to work. - enums.each do |enum| - enum.next - end + it 'returns connection to the pool when cursor is drained' do + view = collection.find({}, batch_size: 1) + enum = view.to_enum + expect_any_instance_of(Mongo::Cursor).to receive(:check_in_connection) + # Drain the cursor + enum.each { |it| it.nil? } end end end diff --git a/spec/mongo/cursor_spec.rb b/spec/mongo/cursor_spec.rb index 0b4b3a8cb8..9aac3655b6 100644 --- a/spec/mongo/cursor_spec.rb +++ b/spec/mongo/cursor_spec.rb @@ -174,7 +174,11 @@ before do expect(cursor).to receive(:get_more_operation).and_return(op).ordered - expect(op).to receive(:execute).and_raise(Mongo::Error::SocketError).ordered + if SpecConfig.instance.connect_options[:connect] == :load_balanced + expect(op).to receive(:execute_with_connection).and_raise(Mongo::Error::SocketError).ordered + else + expect(op).to receive(:execute).and_raise(Mongo::Error::SocketError).ordered + end end it 'raises the error' do @@ -621,6 +625,9 @@ allow(reply).to receive(:connection_description).and_return(conn_desc) allow(reply).to receive(:cursor_id).and_return(42) allow(reply).to receive(:connection_global_id).and_return(1) + if SpecConfig.instance.connect_options[:connect] == :load_balanced + allow(reply).to receive(:connection).and_return(nil) + end end end @@ -774,10 +781,16 @@ it 'does not raise an error' do cursor - server.with_connection do |conn| - expect(conn).to receive(:deliver) - .at_least(:once) - .and_raise(Mongo::Error::SocketError, "test error") + if SpecConfig.instance.connect_options[:connect] == :load_balanced + expect(cursor.connection).to receive(:deliver) + .at_least(:once) + .and_raise(Mongo::Error::SocketError, "test error") + else + server.with_connection do |conn| + expect(conn).to receive(:deliver) + .at_least(:once) + .and_raise(Mongo::Error::SocketError, "test error") + end end expect do cursor.close From 5cc14a1345fee80e6343b5787de6dfec30cfefc7 Mon Sep 17 00:00:00 2001 From: Dmitry Rybakov Date: Wed, 14 Aug 2024 12:53:26 +0200 Subject: [PATCH 4/5] Implement reaping for lb --- gemfiles/standard.rb | 1 + lib/mongo/cluster/reapers/cursor_reaper.rb | 7 ++++++- lib/mongo/cursor.rb | 7 +++++-- lib/mongo/cursor/kill_spec.rb | 7 +++++-- spec/integration/cursor_reaping_spec.rb | 2 +- 5 files changed, 18 insertions(+), 6 deletions(-) diff --git a/gemfiles/standard.rb b/gemfiles/standard.rb index f29b91cb4b..64f2a55a52 100644 --- a/gemfiles/standard.rb +++ b/gemfiles/standard.rb @@ -65,6 +65,7 @@ def standard_dependencies gem 'tilt' # solargraph depends on rbs, which won't build on jruby for some reason gem 'solargraph', platforms: :mri + gem 'ruby-lsp', platforms: :mri end gem 'libmongocrypt-helper', '~> 1.8.0' if ENV['FLE'] == 'helper' diff --git a/lib/mongo/cluster/reapers/cursor_reaper.rb b/lib/mongo/cluster/reapers/cursor_reaper.rb index b08bab2017..beaf660c41 100644 --- a/lib/mongo/cluster/reapers/cursor_reaper.rb +++ b/lib/mongo/cluster/reapers/cursor_reaper.rb @@ -194,7 +194,12 @@ def kill_cursors server_api: server.options[:server_api], connection_global_id: kill_spec.connection_global_id, } - op.execute(server, context: Operation::Context.new(options: options)) + if connection = kill_spec.connection + op.execute_with_connection(connection, context: Operation::Context.new(options: options)) + connection.connection_pool.check_in(connection) + else + op.execute(server, context: Operation::Context.new(options: options)) + end if session = kill_spec.session if session.implicit? diff --git a/lib/mongo/cursor.rb b/lib/mongo/cursor.rb index 869a5ca2b6..14a486a713 100644 --- a/lib/mongo/cursor.rb +++ b/lib/mongo/cursor.rb @@ -100,8 +100,10 @@ def initialize(view, result, server, options = {}) check_in_connection else register - ObjectSpace.define_finalizer(self, self.class.finalize(kill_spec(@connection_global_id), - cluster)) + ObjectSpace.define_finalizer( + self, + self.class.finalize(kill_spec(@connection_global_id), cluster) + ) end end @@ -406,6 +408,7 @@ def kill_spec(connection_global_id) connection_global_id: connection_global_id, server_address: server.address, session: @session, + connection: @connection ) end diff --git a/lib/mongo/cursor/kill_spec.rb b/lib/mongo/cursor/kill_spec.rb index 117f1b50fa..09047ba371 100644 --- a/lib/mongo/cursor/kill_spec.rb +++ b/lib/mongo/cursor/kill_spec.rb @@ -31,7 +31,8 @@ def initialize( db_name:, connection_global_id:, server_address:, - session: + session:, + connection: nil ) @cursor_id = cursor_id @coll_name = coll_name @@ -39,6 +40,7 @@ def initialize( @connection_global_id = connection_global_id @server_address = server_address @session = session + @connection = connection end attr_reader :cursor_id, @@ -46,7 +48,8 @@ def initialize( :db_name, :connection_global_id, :server_address, - :session + :session, + :connection def ==(other) cursor_id == other.cursor_id && diff --git a/spec/integration/cursor_reaping_spec.rb b/spec/integration/cursor_reaping_spec.rb index 876ae46a91..9bd8891378 100644 --- a/spec/integration/cursor_reaping_spec.rb +++ b/spec/integration/cursor_reaping_spec.rb @@ -24,7 +24,7 @@ let(:subscriber) { Mrss::EventSubscriber.new } let(:client) do - authorized_client.tap do |client| + authorized_client.with(max_pool_size: 10).tap do |client| client.subscribe(Mongo::Monitoring::COMMAND, subscriber) end end From 7fb41b5af3d1aa5d8f8595abe133f4d2342d8fdf Mon Sep 17 00:00:00 2001 From: Dmitry Rybakov Date: Tue, 20 Aug 2024 08:17:21 +0200 Subject: [PATCH 5/5] Skip on serverless --- spec/spec_tests/data/client_side_encryption/badQueries.yml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/spec/spec_tests/data/client_side_encryption/badQueries.yml b/spec/spec_tests/data/client_side_encryption/badQueries.yml index 893622c4ce..bdd8627fb4 100644 --- a/spec/spec_tests/data/client_side_encryption/badQueries.yml +++ b/spec/spec_tests/data/client_side_encryption/badQueries.yml @@ -1,5 +1,6 @@ runOn: - minServerVersion: "4.1.10" + topology: [ "replicaset", "sharded" ] database_name: &database_name "default" collection_name: &collection_name "default" @@ -533,4 +534,4 @@ tests: filter: {} fieldName: "encrypted_w_altname" result: - errorContains: "The distinct key is not allowed to be marked for encryption with a non-UUID keyId" \ No newline at end of file + errorContains: "The distinct key is not allowed to be marked for encryption with a non-UUID keyId"