Skip to content
This repository was archived by the owner on Apr 14, 2022. It is now read-only.

Commit 02d41d9

Browse files
committed
Add max_batch_size option for bfs_executor
1 parent 2b3fd5b commit 02d41d9

File tree

2 files changed

+102
-37
lines changed

2 files changed

+102
-37
lines changed

graphql/bfs_executor.lua

Lines changed: 65 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,7 @@
176176
--- }
177177
--- }
178178

179+
local json = require('json')
179180
local utils = require('graphql.utils')
180181
local core_util = require('graphql.core.util')
181182
local core_types = require('graphql.core.types')
@@ -725,14 +726,20 @@ local function invoke_resolve_list(prepared_object_list, context, opts)
725726
local qcontext = opts.qcontext
726727
local accessor = opts.accessor
727728
local is_item_cache_only = opts.is_item_cache_only or false
728-
729-
fetch_resolve_list(prepared_object_list, {accessor = accessor,
730-
qcontext = qcontext})
729+
local max_batch_size = opts.max_batch_size
731730

732731
local open_set = {}
733732
local cache_only_open_set = {}
734733

735-
for _, prepared_object in ipairs(prepared_object_list) do
734+
local last_fetched_object_num = 0
735+
for i, prepared_object in ipairs(prepared_object_list) do
736+
if i > last_fetched_object_num then
737+
local _, size = fetch_resolve_list(prepared_object_list,
738+
{accessor = accessor, qcontext = qcontext,
739+
max_batch_size = max_batch_size, start_from = i})
740+
last_fetched_object_num = last_fetched_object_num + size
741+
end
742+
736743
local child = invoke_resolve(prepared_object, context,
737744
{qcontext = qcontext, is_item_cache_only = is_item_cache_only})
738745
local child_open_set = child.open_set
@@ -753,23 +760,26 @@ end
753760
-- Analyze prepared requests and prefetch in batches {{{
754761

755762
fetch_first_same = function(open_set, opts)
763+
local func_name = 'bfs_executor.fetch_first_same'
756764
local opts = opts or {}
757765
local accessor = opts.accessor
758766
local qcontext = opts.qcontext
767+
local max_batch_size = opts.max_batch_size
759768

760769
if not accessor:cache_is_supported() then return nil, 0 end
761770

762771
local size = 0
763772

764773
local batches = {}
765774
for i, item in ipairs(open_set) do
775+
if i > max_batch_size then break end
766776
if item.prepared_object == nil then break end
767777
local prepared_object = item.prepared_object
768778

769779
for field_name, field_info in pairs(prepared_object.fields_info) do
770780
local prepared_resolve = field_info.prepared_resolve
771781
if prepared_resolve.is_calculated then
772-
-- don't update size, because we don't add request to 'batches'
782+
size = i
773783
break
774784
end
775785
local prepared_select = prepared_resolve.prepared_select
@@ -780,7 +790,9 @@ fetch_first_same = function(open_set, opts)
780790
local iterator_opts = request_opts.iterator_opts
781791

782792
if i == 1 then
783-
assert(batches[field_name] == nil, 'XXX') -- XXX
793+
assert(batches[field_name] == nil,
794+
('internal error: %s: field names "%s" clash'):format(
795+
func_name, field_name))
784796
batches[field_name] = {
785797
collection_name = collection_name,
786798
index_name = index_name,
@@ -792,10 +804,8 @@ fetch_first_same = function(open_set, opts)
792804
local ok =
793805
batches[field_name].collection_name == collection_name and
794806
batches[field_name].index_name == index_name and
795-
utils.is_subtable(batches[field_name].iterator_opts or {},
796-
iterator_opts or {}) and
797-
utils.is_subtable(iterator_opts or {},
798-
batches[field_name].iterator_opts or {})
807+
utils.are_tables_same(batches[field_name].iterator_opts or
808+
{}, iterator_opts or {})
799809
if not ok then break end -- XXX: continue here and return first
800810
-- non-match instead of size?
801811
table.insert(batches[field_name].keys, key)
@@ -805,28 +815,35 @@ fetch_first_same = function(open_set, opts)
805815
end
806816

807817
-- don't flood cache with single-key (non-batch) select results
808-
if size == 1 then
809-
return nil, 1
818+
if size <= 1 then
819+
return nil, size
810820
end
811821

812822
local fetch_id = accessor:cache_fetch(batches, qcontext)
813823
return fetch_id, size
814824
end
815825

816-
-- XXX: is the function redundant because of fetch_first_same?
817826
fetch_resolve_list = function(prepared_object_list, opts)
827+
local func_name = 'bfs_executor.fetch_resolve_list'
818828
local opts = opts or {}
819829
local accessor = opts.accessor
820830
local qcontext = opts.qcontext
831+
local max_batch_size = opts.max_batch_size
832+
local start_from = opts.start_from or 1
821833

822-
if not accessor:cache_is_supported() then return nil end
834+
if not accessor:cache_is_supported() then return nil, 0 end
835+
836+
local size = 0
823837

824838
local batches = {}
825-
for i, prepared_object in ipairs(prepared_object_list) do
839+
for i = 1, #prepared_object_list - start_from + 1 do
840+
if i > max_batch_size then break end
841+
local prepared_object = prepared_object_list[i + start_from - 1]
842+
826843
for field_name, field_info in pairs(prepared_object.fields_info) do
827844
local prepared_resolve = field_info.prepared_resolve
828845
if prepared_resolve.is_calculated then
829-
-- don't update size, because we don't add request to 'batches'
846+
size = i
830847
break
831848
end
832849
local prepared_select = prepared_resolve.prepared_select
@@ -837,33 +854,49 @@ fetch_resolve_list = function(prepared_object_list, opts)
837854
local iterator_opts = request_opts.iterator_opts
838855

839856
if i == 1 then
840-
assert(batches[field_name] == nil, 'XXX') -- XXX
857+
assert(batches[field_name] == nil,
858+
('internal error: %s: field names "%s" clash'):format(
859+
func_name, field_name))
841860
batches[field_name] = {
842861
collection_name = collection_name,
843862
index_name = index_name,
844863
keys = {key},
845864
iterator_opts = iterator_opts,
846865
}
866+
size = i
847867
else
848868
assert(batches[field_name].collection_name == collection_name,
849-
'XXX') -- XXX
869+
('internal error: %s: prepared object list has ' ..
870+
'different collection names: "%s" and "%s"'):format(
871+
func_name, batches[field_name].collection_name,
872+
collection_name))
850873
assert(batches[field_name].index_name == index_name,
851-
'XXX') -- XXX
852-
assert(utils.is_subtable(batches[field_name].iterator_opts or
853-
{}, iterator_opts or {}), 'XXX') -- XXX
854-
assert(utils.is_subtable(iterator_opts or {},
855-
batches[field_name].iterator_opts or {}), 'XXX') -- XXX
874+
('internal error: %s: prepared object list has ' ..
875+
'different index names: "%s" and "%s"'):format(func_name,
876+
tostring(batches[field_name].index_name),
877+
tostring(index_name)))
878+
local ok = utils.are_tables_same(batches[field_name].iterator_opts or
879+
{}, iterator_opts or {})
880+
if not ok then -- avoid extra json.encode()
881+
assert(ok, ('internal error: %s: prepared object list ' ..
882+
'has different iterator options: "%s" and "%s"'):format(
883+
func_name,
884+
json.encode(batches[field_name].iterator_opts),
885+
json.encode(iterator_opts)))
886+
end
856887
table.insert(batches[field_name].keys, key)
888+
size = i
857889
end
858890
end
859891
end
860892

861-
if next(batches) == nil then
862-
return nil
893+
-- don't flood cache with single-key (non-batch) select results
894+
if size <= 1 then
895+
return nil, size
863896
end
864897

865898
local fetch_id = accessor:cache_fetch(batches, qcontext)
866-
return fetch_id
899+
return fetch_id, size
867900
end
868901

869902
-- }}}
@@ -937,12 +970,14 @@ end
937970
---
938971
--- * qcontext
939972
--- * accessor
973+
--- * max_batch_size
940974
---
941975
--- @treturn table result of the query
942976
function bfs_executor.execute(schema, query_ast, variables, operation_name, opts)
943977
local opts = opts or {}
944978
local qcontext = opts.qcontext
945979
local accessor = opts.accessor
980+
local max_batch_size = opts.max_batch_size
946981

947982
local operation = core_query_util.getOperation(query_ast, operation_name)
948983
local root_object_type = schema[operation.operation]
@@ -1024,7 +1059,8 @@ function bfs_executor.execute(schema, query_ast, variables, operation_name, opts
10241059
elseif item.prepared_object_list ~= nil then
10251060
local child = invoke_resolve_list(item.prepared_object_list,
10261061
context, {qcontext = qcontext, accessor = accessor,
1027-
is_item_cache_only = is_item_cache_only})
1062+
is_item_cache_only = is_item_cache_only,
1063+
max_batch_size = max_batch_size})
10281064
local child_open_set = child.open_set
10291065
local child_cache_only_open_set = child.cache_only_open_set
10301066
utils.expand_list(cache_only_open_set, child_cache_only_open_set)
@@ -1033,7 +1069,8 @@ function bfs_executor.execute(schema, query_ast, variables, operation_name, opts
10331069
local open_set_to_fetch = is_item_cache_only and
10341070
cache_only_open_set or open_set
10351071
local fetch_id, size = fetch_first_same(open_set_to_fetch,
1036-
{accessor = accessor, qcontext = qcontext})
1072+
{accessor = accessor, qcontext = qcontext,
1073+
max_batch_size = max_batch_size})
10371074
if #open_set_to_fetch > 0 then
10381075
table.insert(open_set_to_fetch, math.max(2, size + 1), {
10391076
squash_marker = true,

graphql/impl.lua

Lines changed: 37 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@ local check = utils.check
2020

2121
local impl = {}
2222

23+
-- constants
24+
local DEF_MAX_BATCH_SIZE = 1000
25+
2326
-- Instance of the library to provide graphql:compile() and graphql:execute()
2427
-- method (with creating zero configuration graphql instance under hood when
2528
-- calling compile() for the first time).
@@ -36,13 +39,16 @@ local default_instance
3639
--- @treturn table result of the operation
3740
local function gql_execute(qstate, variables, operation_name)
3841
assert(qstate.state)
42+
assert(qstate.query_settings)
3943
local state = qstate.state
4044
assert(state.schema)
45+
local max_batch_size = qstate.query_settings.max_batch_size or
46+
state.max_batch_size
4147

4248
check(variables, 'variables', 'table')
4349
check(operation_name, 'operation_name', 'string', 'nil')
50+
check(max_batch_size, 'max_batch_size', 'number')
4451

45-
assert(qstate.query_settings)
4652
local qcontext = {
4753
query_settings = qstate.query_settings,
4854
}
@@ -77,6 +83,7 @@ local function gql_execute(qstate, variables, operation_name)
7783
variables, operation_name, {
7884
qcontext = qcontext,
7985
accessor = accessor,
86+
max_batch_size = max_batch_size,
8087
})
8188
local executor_metainfo = {
8289
name = 'bfs',
@@ -131,6 +138,27 @@ local function compile_and_execute(state, query, variables, operation_name,
131138
return compiled_query:execute(variables, operation_name)
132139
end
133140

141+
local function validate_query_settings(query_settings, opts)
142+
local opts = opts or {}
143+
local allow_nil = opts.allow_nil or false
144+
145+
local use_bfs_executor = query_settings.use_bfs_executor
146+
local max_batch_size = query_settings.max_batch_size
147+
148+
if not allow_nil or type(use_bfs_executor) ~= 'nil' then
149+
check(use_bfs_executor, 'use_bfs_executor', 'string')
150+
assert(use_bfs_executor == 'never' or
151+
use_bfs_executor == 'shard' or
152+
use_bfs_executor == 'always',
153+
"use_bfs_executor must be 'never', 'shard' (default) or " ..
154+
"'always', 'got " .. tostring(use_bfs_executor))
155+
end
156+
157+
if not allow_nil or type(max_batch_size) ~= 'nil' then
158+
check(max_batch_size, 'max_batch_size', 'number')
159+
end
160+
end
161+
134162
--- Parse GraphQL query string, validate against the GraphQL schema and
135163
--- provide an object with the function to execute an operation from the
136164
--- request with specific variables values.
@@ -140,12 +168,13 @@ end
140168
--- @tparam string query text of a GraphQL query
141169
---
142170
--- @tparam[opt] table opts the following options (described in
143-
--- @{accessor_general.new}):
171+
--- @{accessor_general.new} and @{impl.new}):
144172
---
145173
--- * resulting_object_cnt_max
146174
--- * fetched_object_cnt_max
147175
--- * timeout_ms
148176
--- * use_bfs_executor
177+
--- * max_batch_size
149178
---
150179
--- @treturn table compiled query with `execute` and `avro_schema` functions
151180
local function gql_compile(state, query, opts)
@@ -168,11 +197,13 @@ local function gql_compile(state, query, opts)
168197
fetched_object_cnt_max = opts.fetched_object_cnt_max,
169198
timeout_ms = opts.timeout_ms,
170199
use_bfs_executor = opts.use_bfs_executor,
200+
max_batch_size = opts.max_batch_size,
171201
}
172202
}
173203

174204
accessor_general.validate_query_settings(qstate.query_settings,
175205
{allow_nil = true})
206+
validate_query_settings(qstate.query_settings, {allow_nil = true})
176207

177208
local gql_query = setmetatable(qstate, {
178209
__index = {
@@ -323,7 +354,8 @@ end
323354
--- timeout_ms = <number>,
324355
--- enable_mutations = <boolean>,
325356
--- disable_dangling_check = <boolean>,
326-
--- use_bfs_executor = 'never' | 'shard' (default) | 'always'
357+
--- use_bfs_executor = 'never' | 'shard' (default) | 'always',
358+
--- max_batch_size = <number>,
327359
--- })
328360
function impl.new(cfg)
329361
local cfg = cfg or {}
@@ -361,16 +393,12 @@ function impl.new(cfg)
361393

362394
check(cfg.disable_dangling_check, 'disable_dangling_check', 'boolean',
363395
'nil')
364-
check(cfg.use_bfs_executor, 'use_bfs_executor', 'string', 'nil')
365-
assert(cfg.use_bfs_executor == 'never' or cfg.use_bfs_executor == 'shard'
366-
or cfg.use_bfs_executor == 'always' or cfg.use_bfs_executor == nil,
367-
"use_bfs_executor must be 'never', 'shard' (default) or 'always', '" ..
368-
"got " .. tostring(cfg.use_bfs_executor))
369-
370396
local state = {
371397
disable_dangling_check = cfg.disable_dangling_check,
372398
use_bfs_executor = cfg.use_bfs_executor or 'shard',
399+
max_batch_size = cfg.max_batch_size or DEF_MAX_BATCH_SIZE,
373400
}
401+
validate_query_settings(state)
374402
convert_schema.convert(state, cfg)
375403
return setmetatable(state, {
376404
__index = {

0 commit comments

Comments
 (0)