Skip to content
This repository was archived by the owner on Apr 14, 2022. It is now read-only.

Commit abfec4f

Browse files
committed
bfs_executor: optimize two connections case
1 parent 2f83a99 commit abfec4f

File tree

4 files changed

+214
-91
lines changed

4 files changed

+214
-91
lines changed

graphql/accessor_shard_cache.lua

Lines changed: 10 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
local json = require('json')
22
local shard = require('shard')
33
local utils = require('graphql.utils')
4+
local request_batch = require('graphql.request_batch')
45
local accessor_shard_index_info = require('graphql.accessor_shard_index_info')
56

67
local check = utils.check
@@ -10,27 +11,15 @@ local accessor_shard_cache = {}
1011
local function get_select_ids(self, batch, opts)
1112
local opts = opts or {}
1213
local skip_cached = opts.skip_cached or false
13-
local ids = {}
14-
for _, key in ipairs(batch.keys) do
15-
local collection_name = batch.collection_name
16-
local index_name = batch.index_name or ''
17-
local key_str
18-
if type(key) ~= 'table' then
19-
key_str = tostring(key)
20-
else
21-
assert(utils.is_array(key), 'compound key must be an array')
22-
key_str = table.concat(key, ',')
23-
end
24-
local opts = batch.iterator_opts or {}
25-
local opts_str = ('%s,%s'):format(opts.iterator or opts[1] or 'EQ',
26-
opts.limit or '')
27-
local id = ('%s.%s.%s.%s'):format(collection_name, index_name, key_str,
28-
opts_str)
29-
if not skip_cached or self.cache[id] == nil then
30-
table.insert(ids, id)
14+
15+
local skip_function
16+
if skip_cached then
17+
skip_function = function(id)
18+
return self.cache[id] ~= nil
3119
end
3220
end
33-
return ids
21+
22+
return batch:select_ids(skip_function)
3423
end
3524

3625
local function net_box_call_wrapper(conn, func_name, call_args)
@@ -268,12 +257,8 @@ end
268257
--- @return luafun iterator (one value) to fetched data or nil
269258
local function cache_lookup(self, collection_name, index_name, key,
270259
iterator_opts)
271-
local batch = {
272-
collection_name = collection_name,
273-
index_name = index_name,
274-
keys = {key or box.NULL},
275-
iterator_opts = iterator_opts,
276-
}
260+
local batch = request_batch.new(collection_name, index_name,
261+
{key or box.NULL}, iterator_opts)
277262
local id = get_select_ids(self, batch)[1]
278263
if self.cache[id] == nil then
279264
return nil

graphql/bfs_executor.lua

Lines changed: 87 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -176,13 +176,13 @@
176176
--- }
177177
--- }
178178

179-
local json = require('json')
180179
local utils = require('graphql.utils')
181180
local core_util = require('graphql.core.util')
182181
local core_types = require('graphql.core.types')
183182
local core_query_util = require('graphql.core.query_util')
184183
local core_validate_variables = require('graphql.core.validate_variables')
185184
local core_introspection = require('graphql.core.introspection')
185+
local request_batch = require('graphql.request_batch')
186186

187187
-- XXX: Possible cache_only requests refactoring. Maybe just set
188188
-- `is_cache_only` flag in prepared_object, find such request in within the
@@ -201,13 +201,9 @@ local core_introspection = require('graphql.core.introspection')
201201
-- Maybe we also can remove separate `is_list = true` case processing over the
202202
-- code that can havily simplify things.
203203

204-
-- XXX: Currently the executor cannot batch similar requests in case when a
205-
-- collection has two connections and so the requests to the different
206-
-- connected (destination) collections placed in the list each after each. The
207-
-- simplest way to mitigate this issue is to consume similar requests (in
208-
-- `fetch_first_same`) w/o respect to the order (traverse until end of the
209-
-- `open_set` or a count limit). The alternative is to introduce resorting
210-
-- prepared requests step.
204+
-- XXX: It would be more natural to have list of tables with field_info content
205+
-- + field_name instead of fields_info to handle each prepared resolve
206+
-- separatelly and add ability to reorder it.
211207

212208
local bfs_executor = {}
213209

@@ -784,34 +780,19 @@ fetch_first_same = function(open_set, opts)
784780
size = i
785781
break
786782
end
787-
local prepared_select = prepared_resolve.prepared_select
788-
local request_opts = prepared_select.request_opts
789-
local collection_name = prepared_select.collection_name
790-
local index_name = request_opts.index_name
791-
local key = request_opts.index_value or box.NULL
792-
local iterator_opts = request_opts.iterator_opts
783+
local batch = request_batch.from_prepared_resolve(prepared_resolve)
793784

794785
if i == 1 then
795786
assert(batches[field_name] == nil,
796787
('internal error: %s: field names "%s" clash'):format(
797788
func_name, field_name))
798-
batches[field_name] = {
799-
collection_name = collection_name,
800-
index_name = index_name,
801-
keys = {key},
802-
iterator_opts = iterator_opts,
803-
}
789+
batches[field_name] = batch
804790
size = i
805791
else
806-
local ok =
807-
batches[field_name] ~= nil and
808-
batches[field_name].collection_name == collection_name and
809-
batches[field_name].index_name == index_name and
810-
utils.are_tables_same(batches[field_name].iterator_opts or
811-
{}, iterator_opts or {})
812-
if not ok then break end -- XXX: continue here and return first
813-
-- non-match instead of size?
814-
table.insert(batches[field_name].keys, key)
792+
local ok = batches[field_name] ~= nil and
793+
batches[field_name]:compare_bins(batch)
794+
if not ok then break end
795+
table.insert(batches[field_name].keys, batch.keys[1])
815796
size = i
816797
end
817798
end
@@ -850,45 +831,20 @@ fetch_resolve_list = function(prepared_object_list, opts)
850831
size = i
851832
break
852833
end
853-
local prepared_select = prepared_resolve.prepared_select
854-
local request_opts = prepared_select.request_opts
855-
local collection_name = prepared_select.collection_name
856-
local index_name = request_opts.index_name
857-
local key = request_opts.index_value or box.NULL
858-
local iterator_opts = request_opts.iterator_opts
834+
local batch = request_batch.from_prepared_resolve(prepared_resolve)
859835

860836
if i == 1 then
861837
assert(batches[field_name] == nil,
862838
('internal error: %s: field names "%s" clash'):format(
863839
func_name, field_name))
864-
batches[field_name] = {
865-
collection_name = collection_name,
866-
index_name = index_name,
867-
keys = {key},
868-
iterator_opts = iterator_opts,
869-
}
840+
batches[field_name] = batch
870841
size = i
871842
else
872-
assert(batches[field_name].collection_name == collection_name,
873-
('internal error: %s: prepared object list has ' ..
874-
'different collection names: "%s" and "%s"'):format(
875-
func_name, batches[field_name].collection_name,
876-
collection_name))
877-
assert(batches[field_name].index_name == index_name,
878-
('internal error: %s: prepared object list has ' ..
879-
'different index names: "%s" and "%s"'):format(func_name,
880-
tostring(batches[field_name].index_name),
881-
tostring(index_name)))
882-
local ok = utils.are_tables_same(batches[field_name].iterator_opts or
883-
{}, iterator_opts or {})
884-
if not ok then -- avoid extra json.encode()
885-
assert(ok, ('internal error: %s: prepared object list ' ..
886-
'has different iterator options: "%s" and "%s"'):format(
887-
func_name,
888-
json.encode(batches[field_name].iterator_opts),
889-
json.encode(iterator_opts)))
843+
local ok, err = batches[field_name]:compare_bins_extra(batch)
844+
if not ok then
845+
error(('internal error: %s: %s'):format(func_name, err))
890846
end
891-
table.insert(batches[field_name].keys, key)
847+
table.insert(batches[field_name].keys, batch.keys[1])
892848
size = i
893849
end
894850
end
@@ -905,6 +861,69 @@ end
905861

906862
-- }}}
907863

864+
-- Reorder requests before add to open_set {{{
865+
866+
local function expand_open_set(open_set, child_open_set, opts)
867+
local opts = opts or {}
868+
local accessor = opts.accessor
869+
870+
if not accessor:cache_is_supported() then
871+
utils.expand_list(open_set, child_open_set)
872+
return
873+
end
874+
875+
local item_bin_to_ordinal = {}
876+
local items_per_ordinal = {}
877+
local next_ordinal = 1
878+
879+
-- Create histogram-like 'items_per_ordinal' structure with lists of items.
880+
-- Each list contain items of the same kind (with the same bin value).
881+
-- Ordinals of the bins are assigned in order of appear in child_open_set.
882+
for _, item in ipairs(child_open_set) do
883+
if item.prepared_object_list ~= nil then
884+
local ordinal = next_ordinal
885+
assert(items_per_ordinal[ordinal] == nil)
886+
items_per_ordinal[ordinal] = {}
887+
next_ordinal = next_ordinal + 1
888+
table.insert(items_per_ordinal[ordinal], item)
889+
else
890+
local prepared_object = item.prepared_object
891+
assert(prepared_object ~= nil)
892+
assert(prepared_object.fields_info ~= nil)
893+
894+
local batch_bins = {}
895+
for field_name, field_info in pairs(prepared_object.fields_info) do
896+
local prepared_resolve = field_info.prepared_resolve
897+
if prepared_resolve.is_calculated then
898+
table.insert(batch_bins, field_name .. ':<calculated>')
899+
else
900+
local batch = request_batch.from_prepared_resolve(
901+
prepared_resolve)
902+
table.insert(batch_bins, field_name .. ':' .. batch:bin())
903+
end
904+
end
905+
906+
local item_bin = table.concat(batch_bins, ';')
907+
local ordinal = item_bin_to_ordinal[item_bin]
908+
if ordinal == nil then
909+
item_bin_to_ordinal[item_bin] = next_ordinal
910+
ordinal = next_ordinal
911+
assert(items_per_ordinal[ordinal] == nil)
912+
items_per_ordinal[ordinal] = {}
913+
next_ordinal = next_ordinal + 1
914+
end
915+
table.insert(items_per_ordinal[ordinal], item)
916+
end
917+
end
918+
919+
-- add items from child_open_set in ordinals order to open_set
920+
for _, items in ipairs(items_per_ordinal) do
921+
utils.expand_list(open_set, items)
922+
end
923+
end
924+
925+
-- }}}
926+
908927
-- Debugging {{{
909928

910929
local function prepared_object_digest(prepared_object)
@@ -1056,19 +1075,21 @@ function bfs_executor.execute(schema, query_ast, variables, operation_name, opts
10561075
local child = invoke_resolve(item.prepared_object, context,
10571076
{qcontext = qcontext,
10581077
is_item_cache_only = is_item_cache_only})
1059-
local child_open_set = child.open_set
10601078
local child_cache_only_open_set = child.cache_only_open_set
1061-
utils.expand_list(cache_only_open_set, child_cache_only_open_set)
1062-
utils.expand_list(open_set, child_open_set)
1079+
local child_open_set = child.open_set
1080+
expand_open_set(cache_only_open_set, child_cache_only_open_set,
1081+
{accessor = accessor})
1082+
expand_open_set(open_set, child_open_set, {accessor = accessor})
10631083
elseif item.prepared_object_list ~= nil then
10641084
local child = invoke_resolve_list(item.prepared_object_list,
10651085
context, {qcontext = qcontext, accessor = accessor,
10661086
is_item_cache_only = is_item_cache_only,
10671087
max_batch_size = max_batch_size})
1068-
local child_open_set = child.open_set
10691088
local child_cache_only_open_set = child.cache_only_open_set
1070-
utils.expand_list(cache_only_open_set, child_cache_only_open_set)
1071-
utils.expand_list(open_set, child_open_set)
1089+
local child_open_set = child.open_set
1090+
expand_open_set(cache_only_open_set, child_cache_only_open_set,
1091+
{accessor = accessor})
1092+
expand_open_set(open_set, child_open_set, {accessor = accessor})
10721093
elseif item.squash_marker ~= nil then
10731094
local open_set_to_fetch = is_item_cache_only and
10741095
cache_only_open_set or open_set

graphql/request_batch.lua

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
local json = require('json')
2+
local utils = require('graphql.utils')
3+
4+
local request_batch = {}
5+
6+
local function iterator_opts_tostring(iterator_opts)
7+
return ('%s,%s'):format(
8+
iterator_opts.iterator or iterator_opts[1] or 'EQ',
9+
iterator_opts.limit or '')
10+
end
11+
12+
--- List of strings, each uniquely identifies select request in the batch.
13+
local function batch_select_ids(self, skip_function)
14+
local ids = {}
15+
16+
local collection_name = self.collection_name
17+
local index_name = self.index_name or ''
18+
local iterator_opts_str = iterator_opts_tostring(self.iterator_opts)
19+
20+
for _, key in ipairs(self.keys) do
21+
local key_str
22+
if type(key) ~= 'table' then
23+
key_str = tostring(key)
24+
else
25+
assert(utils.is_array(key), 'compound key must be an array')
26+
key_str = table.concat(key, ',')
27+
end
28+
local id = ('%s.%s.%s.%s'):format(collection_name, index_name, key_str,
29+
iterator_opts_str)
30+
if skip_function == nil or not skip_function(id) then
31+
table.insert(ids, id)
32+
end
33+
end
34+
35+
return ids
36+
end
37+
38+
--- String uniquely identifies the batch information except keys.
39+
local function batch_bin(self)
40+
return ('%s.%s.%s'):format(
41+
self.collection_name,
42+
self.index_name or '',
43+
iterator_opts_tostring(self.iterator_opts))
44+
end
45+
46+
--- Compare batches by bin.
47+
local function batch_compare_bins(self, other)
48+
return self.collection_name == other.collection_name and
49+
self.index_name == other.index_name and
50+
utils.are_tables_same(self.iterator_opts, other.iterator_opts)
51+
end
52+
53+
--- Compare batches by bin with detailed error in case they don't match.
54+
local function batch_compare_bins_extra(self, other)
55+
if self.collection_name ~= other.collection_name then
56+
local err = ('prepared object list has different collection names: ' ..
57+
'"%s" and "%s"'):format(self.collection_name, other.collection_name)
58+
return false, err
59+
end
60+
61+
if self.index_name ~= other.index_name then
62+
local err = ('prepared object list has different index names: ' ..
63+
'"%s" and "%s"'):format(tostring(self.index_name),
64+
tostring(other.index_name))
65+
return false, err
66+
end
67+
68+
if not utils.are_tables_same(self.iterator_opts, other.iterator_opts) then
69+
local err = ('prepared object list has different iterator options: ' ..
70+
'"%s" and "%s"'):format(json.encode(self.iterator_opts),
71+
json.encode(other.iterator_opts))
72+
return false, err
73+
end
74+
75+
return true
76+
end
77+
78+
local request_batch_mt = {
79+
__index = {
80+
bin = batch_bin,
81+
select_ids = batch_select_ids,
82+
compare_bins = batch_compare_bins,
83+
compare_bins_extra = batch_compare_bins_extra,
84+
}
85+
}
86+
87+
function request_batch.from_prepared_resolve(prepared_resolve)
88+
assert(not prepared_resolve.is_calculated)
89+
local prepared_select = prepared_resolve.prepared_select
90+
local request_opts = prepared_select.request_opts
91+
local collection_name = prepared_select.collection_name
92+
local index_name = request_opts.index_name
93+
local key = request_opts.index_value or box.NULL
94+
local iterator_opts = request_opts.iterator_opts
95+
96+
return setmetatable({
97+
collection_name = collection_name,
98+
index_name = index_name,
99+
keys = {key},
100+
iterator_opts = iterator_opts or {},
101+
}, request_batch_mt)
102+
end
103+
104+
function request_batch.new(collection_name, index_name, keys, iterator_opts)
105+
return setmetatable({
106+
collection_name = collection_name,
107+
index_name = index_name,
108+
keys = keys,
109+
iterator_opts = iterator_opts or {},
110+
}, request_batch_mt)
111+
end
112+
113+
return request_batch

0 commit comments

Comments
 (0)