This repository was archived by the owner on Jan 28, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 109
/
Copy pathhandler.go
239 lines (196 loc) · 5.17 KB
/
handler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
package server
import (
"io"
"regexp"
"strconv"
"strings"
"sync"
"time"
errors "gopkg.in/src-d/go-errors.v1"
sqle "gopkg.in/src-d/go-mysql-server.v0"
"gopkg.in/src-d/go-mysql-server.v0/auth"
"gopkg.in/src-d/go-mysql-server.v0/sql"
"github.com/sirupsen/logrus"
"gopkg.in/src-d/go-vitess.v1/mysql"
"gopkg.in/src-d/go-vitess.v1/sqltypes"
"gopkg.in/src-d/go-vitess.v1/vt/proto/query"
)
var regKillCmd = regexp.MustCompile(`^kill (?:(query|connection) )?(\d+)$`)
var errConnectionNotFound = errors.NewKind("Connection not found: %c")
// TODO parametrize
const rowsBatch = 100
// Handler is a connection handler for a SQLe engine.
type Handler struct {
mu sync.Mutex
e *sqle.Engine
sm *SessionManager
c map[uint32]*mysql.Conn
}
// NewHandler creates a new Handler given a SQLe engine.
func NewHandler(e *sqle.Engine, sm *SessionManager) *Handler {
return &Handler{
e: e,
sm: sm,
c: make(map[uint32]*mysql.Conn),
}
}
// NewConnection reports that a new connection has been established.
func (h *Handler) NewConnection(c *mysql.Conn) {
h.mu.Lock()
if _, ok := h.c[c.ConnectionID]; !ok {
h.c[c.ConnectionID] = c
}
h.mu.Unlock()
logrus.Infof("NewConnection: client %v", c.ConnectionID)
}
// ConnectionClosed reports that a connection has been closed.
func (h *Handler) ConnectionClosed(c *mysql.Conn) {
h.sm.CloseConn(c)
h.mu.Lock()
delete(h.c, c.ConnectionID)
h.mu.Unlock()
if err := h.e.Catalog.UnlockTables(nil, c.ConnectionID); err != nil {
logrus.Errorf("unable to unlock tables on session close: %s", err)
}
logrus.Infof("ConnectionClosed: client %v", c.ConnectionID)
}
// ComQuery executes a SQL query on the SQLe engine.
func (h *Handler) ComQuery(
c *mysql.Conn,
query string,
callback func(*sqltypes.Result) error,
) (err error) {
ctx := h.sm.NewContextWithQuery(c, query)
handled, err := h.handleKill(c, query)
if err != nil {
return err
}
if handled {
return callback(&sqltypes.Result{})
}
start := time.Now()
schema, rows, err := h.e.Query(ctx, query)
defer func() {
if q, ok := h.e.Auth.(*auth.Audit); ok {
q.Query(ctx, time.Since(start), err)
}
}()
if err != nil {
return err
}
var r *sqltypes.Result
var proccesedAtLeastOneBatch bool
for {
if r == nil {
r = &sqltypes.Result{Fields: schemaToFields(schema)}
}
if r.RowsAffected == rowsBatch {
if err := callback(r); err != nil {
return err
}
r = nil
proccesedAtLeastOneBatch = true
continue
}
row, err := rows.Next()
if err != nil {
if err == io.EOF {
break
}
return err
}
r.Rows = append(r.Rows, rowToSQL(schema, row))
r.RowsAffected++
}
if err := rows.Close(); err != nil {
return err
}
// Even if r.RowsAffected = 0, the callback must be
// called to update the state in the go-vitess' listener
// and avoid returning errors when the query doesn't
// produce any results.
if r != nil && (r.RowsAffected == 0 && proccesedAtLeastOneBatch) {
return nil
}
return callback(r)
}
// WarningCount is called at the end of each query to obtain
// the value to be returned to the client in the EOF packet.
// Note that this will be called either in the context of the
// ComQuery callback if the result does not contain any fields,
// or after the last ComQuery call completes.
func (h *Handler) WarningCount(c *mysql.Conn) uint16 {
sess, ok := h.sm.sessions[c.ConnectionID]
if !ok {
return 0
}
return sess.WarningCount()
}
func (h *Handler) handleKill(conn *mysql.Conn, query string) (bool, error) {
q := strings.ToLower(query)
s := regKillCmd.FindStringSubmatch(q)
if s == nil {
return false, nil
}
id, err := strconv.ParseUint(s[2], 10, 64)
if err != nil {
return false, err
}
// KILL CONNECTION and KILL should close the connection. KILL QUERY only
// cancels the query.
//
// https://dev.mysql.com/doc/refman/8.0/en/kill.html
//
// KILL [CONNECTION | QUERY] processlist_id
// - KILL QUERY terminates the statement the connection is currently executing,
// but leaves the connection itself intact.
// - KILL CONNECTION is the same as KILL with no modifier:
// It terminates the connection associated with the given processlist_id,
// after terminating any statement the connection is executing.
if s[1] == "query" {
logrus.Infof("kill query: id %d", id)
h.e.Catalog.Kill(id)
} else {
connID, ok := h.e.Catalog.KillConnection(id)
if !ok {
return false, errConnectionNotFound.New(connID)
}
logrus.Infof("kill connection: id %d, pid: %d", connID, id)
h.mu.Lock()
c, ok := h.c[connID]
if ok {
delete(h.c, connID)
}
h.mu.Unlock()
if !ok {
return false, errConnectionNotFound.New(connID)
}
h.sm.CloseConn(c)
c.Close()
}
return true, nil
}
func rowToSQL(s sql.Schema, row sql.Row) []sqltypes.Value {
o := make([]sqltypes.Value, len(row))
for i, v := range row {
o[i] = s[i].Type.SQL(v)
}
return o
}
func schemaToFields(s sql.Schema) []*query.Field {
fields := make([]*query.Field, len(s))
for i, c := range s {
var charset uint32
if c.Type.Type() == mysql.TypeBlob {
charset = mysql.CharacterSetBinary
} else {
charset = mysql.CharacterSetUtf8
}
fields[i] = &query.Field{
Name: c.Name,
Type: c.Type.Type(),
Charset: charset,
}
}
return fields
}