Skip to content

Commit 8f508ae

Browse files
committed
plan: compute all inner joins in memory if they fit
Fixes src-d#577 Because we do not have a way to estimate the cost of each side of a join, it is really difficult to know when we can compute one in memory. But not doing so, causes inner joins to be painfully slow, as one of the branches is iterated multiple times. This PR addresses this by ensuring that if the right branch of the inner join fits in memory, it will be computed in memory even if the in-memory mode has not been activated by the user. An user can set the maximum threshold of memory the gitbase server can use before considering the joins should not be performed in memory using the `MAX_MEMORY_INNER_JOIN` environment variable or the `max_memory_joins` session variable specifying the number of bytes. The default value for this is the half of the available physical memory on the operating system. Because previously we had two iterators: `innerJoinIter` and `innerJoinMemoryIter`, and now `innerJoinIter` must be able to do the join in memory, `innerJoinMemoryIter` has been removed and `innerJoinIter` replaced with a version that can work with three modes: - `unknownMode` we don't know yet how to perform the join, so keep iterating until we can find out. By the end of the first full pass over the right branch `unknownMode` will either switch to `multipassMode` or `memoryMode`. - `memoryMode` which computes the rest of the join in memory. The iterator can have this mode before starting iterating if the user activated the in memory join via session or environment vars, in which case it will load all the right side on memory before doing any further iteration. Instead, if the iterator started in `unknownMode` and switched to this mode, it's guaranteed to already have loaded all the right side. From that point on, they work in exactly the same way. - `multipassMode`, which was the previous default mode. Iterate the right side of the join for each row in the left side. More expensive, but less memory consuming. The iterator can not start in this mode, and can only be switched to it from `unknownMode` in case the memory used by the gitbase server exceeds the maximum amount of memory either set by the user or by default. Signed-off-by: Miguel Molina <[email protected]>
1 parent b32d2fd commit 8f508ae

File tree

2 files changed

+203
-133
lines changed

2 files changed

+203
-133
lines changed

Diff for: sql/plan/innerjoin.go

+170-100
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,52 @@ import (
44
"io"
55
"os"
66
"reflect"
7+
"runtime"
8+
"strconv"
9+
"strings"
710

811
opentracing "github.com/opentracing/opentracing-go"
12+
"github.com/pbnjay/memory"
13+
"github.com/sirupsen/logrus"
914
"gopkg.in/src-d/go-mysql-server.v0/sql"
1015
)
1116

12-
const experimentalInMemoryJoinKey = "EXPERIMENTAL_IN_MEMORY_JOIN"
13-
const inMemoryJoinSessionVar = "inmemory_joins"
17+
const (
18+
experimentalInMemoryJoinKey = "EXPERIMENTAL_IN_MEMORY_JOIN"
19+
maxMemoryJoinKey = "MAX_MEMORY_INNER_JOIN"
20+
inMemoryJoinSessionVar = "inmemory_joins"
21+
memoryThresholdSessionVar = "max_memory_joins"
22+
)
23+
24+
var (
25+
useInMemoryJoins = shouldUseMemoryJoinsByEnv()
26+
// One fifth of the total physical memory available on the OS (ignoring the
27+
// memory used by other processes).
28+
defaultMemoryThreshold = memory.TotalMemory() / 5
29+
// Maximum amount of memory the gitbase server can have in use before
30+
// considering all inner joins should be done using multipass mode.
31+
maxMemoryJoin = loadMemoryThreshold()
32+
)
33+
34+
func shouldUseMemoryJoinsByEnv() bool {
35+
v := strings.TrimSpace(strings.ToLower(os.Getenv(experimentalInMemoryJoinKey)))
36+
return v == "on" || v == "1"
37+
}
38+
39+
func loadMemoryThreshold() uint64 {
40+
v, ok := os.LookupEnv(maxMemoryJoinKey)
41+
if !ok {
42+
return defaultMemoryThreshold
43+
}
1444

15-
var useInMemoryJoins = os.Getenv(experimentalInMemoryJoinKey) != ""
45+
n, err := strconv.ParseUint(v, 10, 64)
46+
if err != nil {
47+
logrus.Warnf("invalid value %q given to %s environment variable", v, maxMemoryJoinKey)
48+
return defaultMemoryThreshold
49+
}
50+
51+
return n
52+
}
1653

1754
// InnerJoin is an inner join between two tables.
1855
type InnerJoin struct {
@@ -73,27 +110,17 @@ func (j *InnerJoin) RowIter(ctx *sql.Context) (sql.RowIter, error) {
73110
inMemorySession = true
74111
}
75112

76-
var iter sql.RowIter
113+
var mode = unknownMode
77114
if useInMemoryJoins || inMemorySession {
78-
r, err := j.Right.RowIter(ctx)
79-
if err != nil {
80-
span.Finish()
81-
return nil, err
82-
}
115+
mode = memoryMode
116+
}
83117

84-
iter = &innerJoinMemoryIter{
85-
l: l,
86-
r: r,
87-
ctx: ctx,
88-
cond: j.Cond,
89-
}
90-
} else {
91-
iter = &innerJoinIter{
92-
l: l,
93-
rp: j.Right,
94-
ctx: ctx,
95-
cond: j.Cond,
96-
}
118+
iter := &innerJoinIter{
119+
l: l,
120+
rp: j.Right,
121+
ctx: ctx,
122+
cond: j.Cond,
123+
mode: mode,
97124
}
98125

99126
return sql.NewSpanIter(span, iter), nil
@@ -156,6 +183,25 @@ func (j *InnerJoin) TransformExpressions(f sql.TransformExprFunc) (sql.Node, err
156183
return NewInnerJoin(j.Left, j.Right, cond), nil
157184
}
158185

186+
// innerJoinMode defines the mode in which an inner join will be performed.
187+
type innerJoinMode byte
188+
189+
const (
190+
// unknownMode is the default mode. It will start iterating without really
191+
// knowing in which mode it will end up computing the inner join. If it
192+
// iterates the right side fully one time and so far it fits in memory,
193+
// then it will switch to memory mode. Otherwise, if at some point during
194+
// this first iteration it finds that it does not fit in memory, will
195+
// switch to multipass mode.
196+
unknownMode innerJoinMode = iota
197+
// memoryMode computes all the inner join directly in memory iterating each
198+
// side of the join exactly once.
199+
memoryMode
200+
// multipassMode computes the inner join by iterating the left side once,
201+
// and the right side one time for each row in the left side.
202+
multipassMode
203+
)
204+
159205
type innerJoinIter struct {
160206
l sql.RowIter
161207
rp rowIterProvider
@@ -164,118 +210,140 @@ type innerJoinIter struct {
164210
cond sql.Expression
165211

166212
leftRow sql.Row
167-
}
168213

169-
func (i *innerJoinIter) Next() (sql.Row, error) {
170-
for {
171-
if i.leftRow == nil {
172-
r, err := i.l.Next()
173-
if err != nil {
174-
return nil, err
175-
}
214+
// used to compute in-memory
215+
mode innerJoinMode
216+
right []sql.Row
217+
pos int
218+
}
176219

177-
i.leftRow = r
220+
func (i *innerJoinIter) loadLeft() error {
221+
if i.leftRow == nil {
222+
r, err := i.l.Next()
223+
if err != nil {
224+
return err
178225
}
179226

180-
if i.r == nil {
181-
iter, err := i.rp.RowIter(i.ctx)
182-
if err != nil {
183-
return nil, err
184-
}
227+
i.leftRow = r
228+
}
185229

186-
i.r = iter
187-
}
230+
return nil
231+
}
188232

189-
rightRow, err := i.r.Next()
190-
if err == io.EOF {
191-
i.r = nil
192-
i.leftRow = nil
193-
continue
233+
func (i *innerJoinIter) loadRightInMemory() error {
234+
iter, err := i.rp.RowIter(i.ctx)
235+
if err != nil {
236+
return err
237+
}
238+
239+
i.right, err = sql.RowIterToRows(iter)
240+
if err != nil {
241+
return err
242+
}
243+
244+
if len(i.right) == 0 {
245+
return io.EOF
246+
}
247+
248+
return nil
249+
}
250+
251+
func (i *innerJoinIter) fitsInMemory() bool {
252+
var maxMemory uint64
253+
_, v := i.ctx.Session.Get(memoryThresholdSessionVar)
254+
if n, ok := v.(int64); ok {
255+
maxMemory = uint64(n)
256+
} else {
257+
maxMemory = maxMemoryJoin
258+
}
259+
260+
if maxMemory <= 0 {
261+
return true
262+
}
263+
264+
var ms runtime.MemStats
265+
runtime.ReadMemStats(&ms)
266+
267+
return (ms.HeapInuse + ms.StackInuse) < maxMemory
268+
}
269+
270+
func (i *innerJoinIter) loadRight() (row sql.Row, skip bool, err error) {
271+
if i.mode == memoryMode {
272+
if len(i.right) == 0 {
273+
if err := i.loadRightInMemory(); err != nil {
274+
return nil, false, err
275+
}
194276
}
195277

196-
if err != nil {
197-
return nil, err
278+
if i.pos >= len(i.right) {
279+
i.leftRow = nil
280+
i.pos = 0
281+
return nil, true, nil
198282
}
199283

200-
var row = make(sql.Row, len(i.leftRow)+len(rightRow))
201-
copy(row, i.leftRow)
202-
copy(row[len(i.leftRow):], rightRow)
284+
row := i.right[i.pos]
285+
i.pos++
286+
return row, false, nil
287+
}
203288

204-
v, err := i.cond.Eval(i.ctx, row)
289+
if i.r == nil {
290+
iter, err := i.rp.RowIter(i.ctx)
205291
if err != nil {
206-
return nil, err
292+
return nil, false, err
207293
}
208294

209-
if v == true {
210-
return row, nil
211-
}
295+
i.r = iter
212296
}
213-
}
214297

215-
func (i *innerJoinIter) Close() error {
216-
if err := i.l.Close(); err != nil {
217-
if i.r != nil {
218-
_ = i.r.Close()
298+
rightRow, err := i.r.Next()
299+
if err != nil {
300+
if err == io.EOF {
301+
i.r = nil
302+
i.leftRow = nil
303+
304+
// If we got to this point and the mode is still unknown it means
305+
// the right side fits in memory, so the mode changes to memory
306+
// inner join.
307+
if i.mode == unknownMode {
308+
i.mode = memoryMode
309+
}
310+
311+
return nil, true, nil
219312
}
220-
return err
313+
return nil, false, err
221314
}
222315

223-
if i.r != nil {
224-
return i.r.Close()
316+
if i.mode == unknownMode {
317+
if !i.fitsInMemory() {
318+
i.right = nil
319+
i.mode = multipassMode
320+
} else {
321+
i.right = append(i.right, rightRow)
322+
}
225323
}
226324

227-
return nil
228-
}
229-
230-
type innerJoinMemoryIter struct {
231-
l sql.RowIter
232-
r sql.RowIter
233-
ctx *sql.Context
234-
cond sql.Expression
235-
pos int
236-
leftRow sql.Row
237-
right []sql.Row
325+
return rightRow, false, err
238326
}
239327

240-
func (i *innerJoinMemoryIter) Next() (sql.Row, error) {
328+
func (i *innerJoinIter) Next() (sql.Row, error) {
241329
for {
242-
if i.leftRow == nil {
243-
r, err := i.l.Next()
244-
if err != nil {
245-
return nil, err
246-
}
247-
248-
i.leftRow = r
330+
if err := i.loadLeft(); err != nil {
331+
return nil, err
249332
}
250333

251-
if i.r != nil {
252-
for {
253-
row, err := i.r.Next()
254-
if err != nil {
255-
if err == io.EOF {
256-
break
257-
}
258-
return nil, err
259-
}
260-
261-
i.right = append(i.right, row)
262-
}
263-
i.r = nil
334+
rightRow, skip, err := i.loadRight()
335+
if err != nil {
336+
return nil, err
264337
}
265338

266-
if i.pos >= len(i.right) {
267-
i.pos = 0
268-
i.leftRow = nil
339+
if skip {
269340
continue
270341
}
271342

272-
rightRow := i.right[i.pos]
273343
var row = make(sql.Row, len(i.leftRow)+len(rightRow))
274344
copy(row, i.leftRow)
275345
copy(row[len(i.leftRow):], rightRow)
276346

277-
i.pos++
278-
279347
v, err := i.cond.Eval(i.ctx, row)
280348
if err != nil {
281349
return nil, err
@@ -287,7 +355,7 @@ func (i *innerJoinMemoryIter) Next() (sql.Row, error) {
287355
}
288356
}
289357

290-
func (i *innerJoinMemoryIter) Close() error {
358+
func (i *innerJoinIter) Close() error {
291359
if err := i.l.Close(); err != nil {
292360
if i.r != nil {
293361
_ = i.r.Close()
@@ -299,5 +367,7 @@ func (i *innerJoinMemoryIter) Close() error {
299367
return i.r.Close()
300368
}
301369

370+
i.right = nil
371+
302372
return nil
303373
}

0 commit comments

Comments
 (0)