@@ -121,14 +121,42 @@ local function filter_names_fingerprint(filter)
121
121
return name_list_str
122
122
end
123
123
124
+ --- Get an index using parts tree built by @{build_index_parts_tree}.
125
+ ---
126
+ --- @tparam table node root of the prefix tree for certain collection
127
+ ---
128
+ --- @tparam table filter map of key-value to filter objects against
129
+ ---
130
+ --- @treturn string `index_name` or `nil` is found index
131
+ ---
132
+ --- @treturn number `len` is a number of index parts will be used at lookup
133
+ local function get_best_matched_index (node , filter )
134
+ local index_name = (node .index_names or {})[1 ]
135
+ local max_len = 0
136
+
137
+ for k , v in pairs (filter ) do
138
+ local successor_node = (node .successors or {})[k ]
139
+ if successor_node ~= nil then
140
+ local new_filter = table .copy (filter )
141
+ new_filter [k ] = nil
142
+ local branch_index_name , branch_len =
143
+ get_best_matched_index (successor_node , new_filter )
144
+ if branch_index_name ~= nil and branch_len > max_len then
145
+ index_name = branch_index_name
146
+ max_len = branch_len
147
+ end
148
+ end
149
+ end
150
+
151
+ return index_name , max_len + 1
152
+ end
153
+
124
154
-- XXX: raw idea: we can store field-to-field_no mapping when creating
125
155
-- `lookup_index_name` to faster form the value_list
126
156
127
157
--- Flatten filter values (transform to a list) against specific index to
128
158
--- passing it to index:pairs().
129
159
---
130
- --- Only full keys are supported for a compound index for now.
131
- ---
132
160
--- @tparam table self the data accessor
133
161
---
134
162
--- @tparam table filter filter for objects, its values will ordered to form
144
172
--- passed index
145
173
---
146
174
--- @treturn table `value_list` the value to pass to index:pairs()
175
+ ---
176
+ --- @treturn table `new_filter` the `filter` value w/o values extracted to
177
+ --- `value_list`
147
178
local function flatten_filter (self , filter , collection_name , index_name )
148
179
assert (type (self ) == ' table' ,
149
180
' self must be a table, got ' .. type (self ))
@@ -153,6 +184,7 @@ local function flatten_filter(self, filter, collection_name, index_name)
153
184
' index_name must be a string, got ' .. type (index_name ))
154
185
155
186
local value_list = {}
187
+ local new_filter = table .copy (filter )
156
188
157
189
-- fill value_list
158
190
local index_meta = self .indexes [collection_name ][index_name ]
@@ -163,6 +195,7 @@ local function flatten_filter(self, filter, collection_name, index_name)
163
195
local value = filter [field_name ]
164
196
if value == nil then break end
165
197
value_list [# value_list + 1 ] = value
198
+ new_filter [field_name ] = nil
166
199
end
167
200
168
201
-- check for correctness: non-empty value_list
@@ -172,26 +205,11 @@ local function flatten_filter(self, filter, collection_name, index_name)
172
205
json .encode (filter ), index_name ))
173
206
end
174
207
175
- -- check for correctness: all filter fields are used
176
- local count = 0
177
- for k , v in pairs (filter ) do
178
- count = count + 1
179
- end
180
- if count ~= # value_list then -- avoid extra json.encode()
181
- assert (count ~= # value_list ,
182
- (' filter items count does not match index fields count: ' ..
183
- ' filter: %s, index_name: %s' ):format (json .encode (filter ),
184
- index_name ))
185
- end
186
-
187
- local full_match = # value_list == # index_meta .fields
188
- return full_match , value_list
208
+ local full_match = # value_list == # index_meta .fields and
209
+ next (new_filter ) == nil
210
+ return full_match , value_list , new_filter
189
211
end
190
212
191
- -- XXX: support partial match for primary/secondary indexes and support to skip
192
- -- fields to get an index (full_match must be false in the case because
193
- -- returned items will be additionally filtered after unflatten).
194
-
195
213
--- Choose an index for lookup tuple(s) by a 'filter'. The filter holds fields
196
214
--- values of object(s) we want to find. It uses prebuilt `lookup_index_name`
197
215
--- table representing available indexes, which created by the
230
248
---
231
249
--- @treturn string `index_name` is name of the found index or nil
232
250
---
233
- --- @treturn table `value_list` is values list from the `filter` argument
234
- --- ordered in the such way that can be passed to the found index (has some
235
- --- meaning only when `index_name ~= nil`)
251
+ --- @treturn table `new_filter` is the filter value w/o values extracted into
252
+ --- `value_list`
253
+ ---
254
+ --- @treturn table `value_list` (optional) is values list from the `filter`
255
+ --- argument ordered in the such way that can be passed to the found index (has
256
+ --- some meaning only when `index_name ~= nil`)
236
257
---
237
258
--- @treturn table `pivot` (optional) an offset argument represented depending
238
259
--- of a case: whether we'll lookup for the offset by an index; it is either
@@ -259,6 +280,10 @@ local get_index_name = function(self, collection_name, from, filter, args)
259
280
assert (type (lookup_index_name ) == ' table' ,
260
281
' lookup_index_name must be a table, got ' .. type (lookup_index_name ))
261
282
283
+ local parts_tree = index_cache .parts_tree
284
+ assert (type (parts_tree ) == ' table' ,
285
+ ' parts_tree must be a table, got ' .. type (parts_tree ))
286
+
262
287
local connection_indexes = index_cache .connection_indexes
263
288
assert (type (connection_indexes ) == ' table' ,
264
289
' connection_indexes must be a table, got ' .. type (connection_indexes ))
@@ -276,6 +301,7 @@ local get_index_name = function(self, collection_name, from, filter, args)
276
301
assert (connection_type ~= nil , ' connection_type must not be nil' )
277
302
local full_match = connection_type == ' 1:1' and next (filter ) == nil
278
303
local value_list = from .destination_args_values
304
+ local new_filter = filter
279
305
280
306
local pivot
281
307
if args .offset ~= nil then
@@ -296,21 +322,22 @@ local get_index_name = function(self, collection_name, from, filter, args)
296
322
pivot = {filter = pivot_filter }
297
323
end
298
324
299
- return full_match , index_name , value_list , pivot
325
+ return full_match , index_name , new_filter , value_list , pivot
300
326
end
301
327
302
328
-- The 'fast offset' case. Here we fetch top-level objects starting from
303
329
-- passed offset. Select will be performed by the primary index and
304
330
-- corresponding offset in `pivot.value_list`, then the result will be
305
- -- postprocessed using `filter `, if necessary.
331
+ -- postprocessed using `new_filter `, if necessary.
306
332
if args .offset ~= nil then
307
333
local index_name , index_meta = get_primary_index_meta (self ,
308
334
collection_name )
309
335
local full_match
310
336
local pivot_value_list
337
+ local new_filter = filter
311
338
if type (args .offset ) == ' table' then
312
- full_match , pivot_value_list = flatten_filter (self , args . offset ,
313
- collection_name , index_name )
339
+ full_match , pivot_value_list , new_filter = flatten_filter (self ,
340
+ args . offset , collection_name , index_name )
314
341
assert (full_match == true , ' offset by a partial key is forbidden' )
315
342
else
316
343
assert (# index_meta .fields == 1 ,
@@ -320,22 +347,34 @@ local get_index_name = function(self, collection_name, from, filter, args)
320
347
end
321
348
local pivot = {value_list = pivot_value_list }
322
349
full_match = full_match and next (filter ) == nil
323
- return full_match , index_name , filter , pivot
350
+ return full_match , index_name , new_filter , nil , pivot
324
351
end
325
352
326
353
-- The 'no offset' case. Here we fetch top-level object either by found
327
354
-- index or using full scan (if the index was not found).
355
+
356
+ -- try to find full index
328
357
local name_list_str = filter_names_fingerprint (filter )
329
358
assert (lookup_index_name [collection_name ] ~= nil ,
330
359
(' cannot find any index for collection "%s"' ):format (collection_name ))
331
360
local index_name = lookup_index_name [collection_name ][name_list_str ]
332
361
local full_match = false
333
362
local value_list = nil
363
+ local new_filter = filter
364
+
365
+ -- try to find partial index
366
+ if index_name == nil then
367
+ local root = parts_tree [collection_name ]
368
+ index_name = get_best_matched_index (root , filter )
369
+ end
370
+
371
+ -- fill full_match and value_list appropriatelly
334
372
if index_name ~= nil then
335
- full_match , value_list = flatten_filter (self , filter , collection_name ,
336
- index_name )
373
+ full_match , value_list , new_filter = flatten_filter (self , filter ,
374
+ collection_name , index_name )
337
375
end
338
- return full_match , index_name , value_list
376
+
377
+ return full_match , index_name , new_filter , value_list
339
378
end
340
379
341
380
--- Build `lookup_index_name` table (part of `index_cache`) to use in the
@@ -402,6 +441,34 @@ local function build_lookup_index_name(indexes)
402
441
return lookup_index_name
403
442
end
404
443
444
+ --- Build `parts_tree` to use in @{get_index_name} for lookup best matching
445
+ --- index.
446
+ ---
447
+ --- @tparam table indexes indexes metainformation as defined in the @{new}
448
+ --- function
449
+ ---
450
+ --- @treturn table `roots` resulting parts prefix tree
451
+ local function build_index_parts_tree (indexes )
452
+ local roots = {}
453
+
454
+ for collection_name , indexes_meta in pairs (indexes ) do
455
+ local root = {}
456
+ roots [collection_name ] = root
457
+ for index_name , index_meta in pairs (indexes_meta ) do
458
+ local cur = root
459
+ for _ , field in ipairs (index_meta .fields ) do
460
+ cur .successors = cur .successors or {}
461
+ cur .successors [field ] = cur .successors [field ] or {}
462
+ cur = cur .successors [field ]
463
+ cur .index_names = cur .index_names or {}
464
+ cur .index_names [# cur .index_names + 1 ] = index_name
465
+ end
466
+ end
467
+ end
468
+
469
+ return roots
470
+ end
471
+
405
472
--- Build `connection_indexes` table (part of `index_cache`) to use in the
406
473
--- @{get_index_name} function.
407
474
---
491
558
local function build_index_cache (indexes , collections )
492
559
return {
493
560
lookup_index_name = build_lookup_index_name (indexes ),
561
+ parts_tree = build_index_parts_tree (indexes ),
494
562
connection_indexes = build_connection_indexes (indexes , collections ),
495
563
}
496
564
end
@@ -682,8 +750,8 @@ local function select_internal(self, collection_name, from, filter, args, extra)
682
750
(' cannot find collection "%s"' ):format (collection_name ))
683
751
684
752
-- search for suitable index
685
- local full_match , index_name , index_value , pivot = get_index_name (
686
- self , collection_name , from , filter , args )
753
+ local full_match , index_name , filter , index_value , pivot = get_index_name (
754
+ self , collection_name , from , filter , args ) -- we redefine filter here
687
755
local index = index_name ~= nil and
688
756
self .funcs .get_index (collection_name , index_name ) or nil
689
757
if from ~= nil then
0 commit comments