forked from mongodb/mongo-ruby-driver
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmap_reduce.rb
344 lines (307 loc) · 11.6 KB
/
map_reduce.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
# frozen_string_literal: true
# rubocop:todo all
# Copyright (C) 2014-2020 MongoDB Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
module Mongo
class Collection
class View
# Provides behavior around a map/reduce operation on the collection
# view.
#
# @since 2.0.0
class MapReduce
extend Forwardable
include Enumerable
include Immutable
include Loggable
include Retryable
# The inline option.
#
# @since 2.1.0
INLINE = 'inline'.freeze
# Reroute message.
#
# @since 2.1.0
# @deprecated
REROUTE = 'Rerouting the MapReduce operation to the primary server.'.freeze
# @return [ View ] view The collection view.
attr_reader :view
# @return [ String ] map The map function.
attr_reader :map_function
# @return [ String ] reduce The reduce function.
attr_reader :reduce_function
# Delegate necessary operations to the view.
def_delegators :view, :collection, :read, :cluster, :timeout_ms
# Delegate necessary operations to the collection.
def_delegators :collection, :database, :client
# Iterate through documents returned by the map/reduce.
#
# @example Iterate through the result of the map/reduce.
# map_reduce.each do |document|
# p document
# end
#
# @return [ Enumerator ] The enumerator.
#
# @since 2.0.0
#
# @yieldparam [ Hash ] Each matching document.
def each
@cursor = nil
session = client.get_session(@options)
server = cluster.next_primary(nil, session)
context = Operation::Context.new(client: client, session: session, operation_timeouts: view.operation_timeouts)
if server.load_balancer?
# Connection will be checked in when cursor is drained.
connection = server.pool.check_out(context: context)
result = send_initial_query_with_connection(connection, context.session, context: context)
result = send_fetch_query_with_connection(connection, session) unless inline?
else
result = send_initial_query(server, context)
result = send_fetch_query(server, session) unless inline?
end
@cursor = Cursor.new(view, result, server, session: session)
if block_given?
@cursor.each do |doc|
yield doc
end
else
@cursor.to_enum
end
end
# Set or get the finalize function for the operation.
#
# @example Set the finalize function.
# map_reduce.finalize(function)
#
# @param [ String ] function The finalize js function.
#
# @return [ MapReduce, String ] The new MapReduce operation or the
# value of the function.
#
# @since 2.0.0
def finalize(function = nil)
configure(:finalize, function)
end
# Initialize the map/reduce for the provided collection view, functions
# and options.
#
# @example Create the new map/reduce view.
#
# @param [ Collection::View ] view The collection view.
# @param [ String ] map The map function.
# @param [ String ] reduce The reduce function.
# @param [ Hash ] options The map/reduce options.
#
# @since 2.0.0
def initialize(view, map, reduce, options = {})
@view = view
@map_function = map.dup.freeze
@reduce_function = reduce.dup.freeze
@options = BSON::Document.new(options).freeze
client.log_warn('The map_reduce operation is deprecated, please use the aggregation pipeline instead')
end
# Set or get the jsMode flag for the operation.
#
# @example Set js mode for the operation.
# map_reduce.js_mode(true)
#
# @param [ true, false ] value The jsMode value.
#
# @return [ MapReduce, true, false ] The new MapReduce operation or the
# value of the jsMode flag.
#
# @since 2.0.0
def js_mode(value = nil)
configure(:js_mode, value)
end
# Set or get the output location for the operation.
#
# @example Set the output to inline.
# map_reduce.out(inline: 1)
#
# @example Set the output collection to merge.
# map_reduce.out(merge: 'users')
#
# @example Set the output collection to replace.
# map_reduce.out(replace: 'users')
#
# @example Set the output collection to reduce.
# map_reduce.out(reduce: 'users')
#
# @param [ Hash ] location The output location details.
#
# @return [ MapReduce, Hash ] The new MapReduce operation or the value
# of the output location.
#
# @since 2.0.0
def out(location = nil)
configure(:out, location)
end
# Returns the collection name where the map-reduce result is written to.
# If the result is returned inline, returns nil.
def out_collection_name
if options[:out].respond_to?(:keys)
options[:out][OUT_ACTIONS.find do |action|
options[:out][action]
end]
end || options[:out]
end
# Returns the database name where the map-reduce result is written to.
# If the result is returned inline, returns nil.
def out_database_name
if options[:out]
if options[:out].respond_to?(:keys) && (db = options[:out][:db])
db
else
database.name
end
end
end
# Set or get a scope on the operation.
#
# @example Set the scope value.
# map_reduce.scope(value: 'test')
#
# @param [ Hash ] object The scope object.
#
# @return [ MapReduce, Hash ] The new MapReduce operation or the value
# of the scope.
#
# @since 2.0.0
def scope(object = nil)
configure(:scope, object)
end
# Whether to include the timing information in the result.
#
# @example Set the verbose value.
# map_reduce.verbose(false)
#
# @param [ true, false ] value Whether to include timing information
# in the result.
#
# @return [ MapReduce, Hash ] The new MapReduce operation or the value
# of the verbose option.
#
# @since 2.0.5
def verbose(value = nil)
configure(:verbose, value)
end
# Execute the map reduce, without doing a fetch query to retrieve the results
# if outputted to a collection.
#
# @example Execute the map reduce and get the raw result.
# map_reduce.execute
#
# @return [ Mongo::Operation::Result ] The raw map reduce result
#
# @since 2.5.0
def execute
view.send(:with_session, @options) do |session|
write_concern = view.write_concern_with_session(session)
context = Operation::Context.new(client: client, session: session)
nro_write_with_retry(write_concern, context: context) do |connection, txn_num, context|
send_initial_query_with_connection(connection, session, context: context)
end
end
end
private
OUT_ACTIONS = [ :replace, :merge, :reduce ].freeze
def server_selector
@view.send(:server_selector)
end
def inline?
out.nil? || out == { inline: 1 } || out == { INLINE => 1 }
end
def map_reduce_spec(session = nil)
Builder::MapReduce.new(map_function, reduce_function, view, options.merge(session: session)).specification
end
def new(options)
MapReduce.new(view, map_function, reduce_function, options)
end
def initial_query_op(session)
spec = map_reduce_spec(session)
# Read preference isn't simply passed in the command payload
# (it may need to be converted to wire protocol flags).
# Passing it in command payload produces errors on at least
# 5.0 mongoses.
# In the future map_reduce_command should remove :read
# from its return value, however we cannot do this right now
# due to Mongoid 7 relying on :read being returned as part of
# the command - see RUBY-2932.
# Delete :read here for now because it cannot be sent to mongos this way.
spec = spec.dup
spec[:selector] = spec[:selector].dup
spec[:selector].delete(:read)
Operation::MapReduce.new(spec)
end
def valid_server?(description)
if secondary_ok?
true
else
description.standalone? || description.mongos? || description.primary? || description.load_balancer?
end
end
def secondary_ok?
out.respond_to?(:keys) && out.keys.first.to_s.downcase == INLINE
end
def send_initial_query(server, context)
server.with_connection do |connection|
send_initial_query_with_connection(connection, context.session, context: context)
end
end
def send_initial_query_with_connection(connection, session, context:)
op = initial_query_op(session)
if valid_server?(connection.description)
op.execute_with_connection(connection, context: context)
else
msg = "Rerouting the MapReduce operation to the primary server - #{connection.address} is not suitable because it is not currently the primray"
log_warn(msg)
server = cluster.next_primary(nil, session)
op.execute(server, context: context)
end
end
def fetch_query_spec
Builder::MapReduce.new(map_function, reduce_function, view, options).query_specification
end
def find_command_spec(session)
Builder::MapReduce.new(map_function, reduce_function, view, options.merge(session: session)).command_specification
end
def fetch_query_op(session)
spec = {
coll_name: out_collection_name,
db_name: out_database_name,
filter: {},
session: session,
read: read,
read_concern: options[:read_concern] || collection.read_concern,
collation: options[:collation] || view.options[:collation],
}
Operation::Find.new(spec)
end
def send_fetch_query(server, session)
fetch_query_op(session).execute(server, context: Operation::Context.new(client: client, session: session))
end
def send_fetch_query_with_connection(connection, session)
fetch_query_op(
session
).execute_with_connection(
connection,
context: Operation::Context.new(client: client, session: session)
)
end
end
end
end
end