Skip to content

Commit d6f5ffa

Browse files
authored
Merge pull request #596 from skoef/selectStreaming
extended ExecuteSelectStreaming
2 parents 7fd0cb3 + 06a439e commit d6f5ffa

File tree

8 files changed

+67
-12
lines changed

8 files changed

+67
-12
lines changed

README.md

+2-2
Original file line numberDiff line numberDiff line change
@@ -237,8 +237,8 @@ err := conn.ExecuteSelectStreaming(`select id, name from table LIMIT 100500`, &r
237237
// Copy it if you need.
238238
// ...
239239
}
240-
return false, nil
241-
})
240+
return nil
241+
}, nil)
242242

243243
// ...
244244
```

client/conn.go

+7-3
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,9 @@ type Conn struct {
3939
// This function will be called for every row in resultset from ExecuteSelectStreaming.
4040
type SelectPerRowCallback func(row []FieldValue) error
4141

42+
// This function will be called once per result from ExecuteSelectStreaming
43+
type SelectPerResultCallback func(result *Result) error
44+
4245
func getNetProto(addr string) string {
4346
proto := "tcp"
4447
if strings.Contains(addr, "/") {
@@ -183,6 +186,7 @@ func (c *Conn) Execute(command string, args ...interface{}) (*Result, error) {
183186

184187
// ExecuteSelectStreaming will call perRowCallback for every row in resultset
185188
// WITHOUT saving any row data to Result.{Values/RawPkg/RowDatas} fields.
189+
// When given, perResultCallback will be called once per result
186190
//
187191
// ExecuteSelectStreaming should be used only for SELECT queries with a large response resultset for memory preserving.
188192
//
@@ -193,14 +197,14 @@ func (c *Conn) Execute(command string, args ...interface{}) (*Result, error) {
193197
// // Use the row as you want.
194198
// // You must not save FieldValue.AsString() value after this callback is done. Copy it if you need.
195199
// return nil
196-
// })
200+
// }, nil)
197201
//
198-
func (c *Conn) ExecuteSelectStreaming(command string, result *Result, perRowCallback SelectPerRowCallback) error {
202+
func (c *Conn) ExecuteSelectStreaming(command string, result *Result, perRowCallback SelectPerRowCallback, perResultCallback SelectPerResultCallback) error {
199203
if err := c.writeCommandStr(COM_QUERY, command); err != nil {
200204
return errors.Trace(err)
201205
}
202206

203-
return c.readResultStreaming(false, result, perRowCallback)
207+
return c.readResultStreaming(false, result, perRowCallback, perResultCallback)
204208
}
205209

206210
func (c *Conn) Begin() error {

client/resp.go

+15-3
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,7 @@ func (c *Conn) readResult(binary bool) (*Result, error) {
233233
return c.readResultset(firstPkgBuf, binary)
234234
}
235235

236-
func (c *Conn) readResultStreaming(binary bool, result *Result, perRowCb SelectPerRowCallback) error {
236+
func (c *Conn) readResultStreaming(binary bool, result *Result, perRowCb SelectPerRowCallback, perResCb SelectPerResultCallback) error {
237237
firstPkgBuf, err := c.ReadPacketReuseMem(utils.ByteSliceGet(16)[:0])
238238
defer utils.ByteSlicePut(firstPkgBuf)
239239

@@ -267,7 +267,7 @@ func (c *Conn) readResultStreaming(binary bool, result *Result, perRowCb SelectP
267267
return ErrMalformPacket
268268
}
269269

270-
return c.readResultsetStreaming(firstPkgBuf, binary, result, perRowCb)
270+
return c.readResultsetStreaming(firstPkgBuf, binary, result, perRowCb, perResCb)
271271
}
272272

273273
func (c *Conn) readResultset(data []byte, binary bool) (*Result, error) {
@@ -293,7 +293,7 @@ func (c *Conn) readResultset(data []byte, binary bool) (*Result, error) {
293293
return result, nil
294294
}
295295

296-
func (c *Conn) readResultsetStreaming(data []byte, binary bool, result *Result, perRowCb SelectPerRowCallback) error {
296+
func (c *Conn) readResultsetStreaming(data []byte, binary bool, result *Result, perRowCb SelectPerRowCallback, perResCb SelectPerResultCallback) error {
297297
columnCount, _, n := LengthEncodedInt(data)
298298

299299
if n-len(data) != 0 {
@@ -307,14 +307,26 @@ func (c *Conn) readResultsetStreaming(data []byte, binary bool, result *Result,
307307
result.Reset(int(columnCount))
308308
}
309309

310+
// this is a streaming resultset
311+
result.Resultset.Streaming = true
312+
310313
if err := c.readResultColumns(result); err != nil {
311314
return errors.Trace(err)
312315
}
313316

317+
if perResCb != nil {
318+
if err := perResCb(result); err != nil {
319+
return err
320+
}
321+
}
322+
314323
if err := c.readResultRowsStreaming(result, binary, perRowCb); err != nil {
315324
return errors.Trace(err)
316325
}
317326

327+
// this resultset is done streaming
328+
result.Resultset.StreamingDone = true
329+
318330
return nil
319331
}
320332

client/stmt.go

+8
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,14 @@ func (s *Stmt) Execute(args ...interface{}) (*Result, error) {
3939
return s.conn.readResult(true)
4040
}
4141

42+
func (s *Stmt) ExecuteSelectStreaming(result *Result, perRowCb SelectPerRowCallback, perResCb SelectPerResultCallback, args ...interface{}) error {
43+
if err := s.write(args...); err != nil {
44+
return errors.Trace(err)
45+
}
46+
47+
return s.conn.readResultStreaming(true, result, perRowCb, perResCb)
48+
}
49+
4250
func (s *Stmt) Close() error {
4351
if err := s.conn.writeCommandUint32(COM_STMT_CLOSE, s.id); err != nil {
4452
return errors.Trace(err)

mysql/resultset.go

+3
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@ type Resultset struct {
1717
RawPkg []byte
1818

1919
RowDatas []RowData
20+
21+
Streaming bool
22+
StreamingDone bool
2023
}
2124

2225
var (

mysql/resultset_helper.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import (
88
"github.com/siddontang/go/hack"
99
)
1010

11-
func formatTextValue(value interface{}) ([]byte, error) {
11+
func FormatTextValue(value interface{}) ([]byte, error) {
1212
switch v := value.(type) {
1313
case int8:
1414
return strconv.AppendInt(nil, int64(v), 10), nil
@@ -165,7 +165,7 @@ func BuildSimpleTextResultset(names []string, values [][]interface{}) (*Resultse
165165
return nil, errors.Errorf("row types aren't consistent")
166166
}
167167
}
168-
b, err = formatTextValue(value)
168+
b, err = FormatTextValue(value)
169169

170170
if err != nil {
171171
return nil, errors.Trace(err)

server/command.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ func (c *Conn) HandleCommand() error {
4444

4545
v := c.dispatch(data)
4646

47-
err = c.writeValue(v)
47+
err = c.WriteValue(v)
4848

4949
if c.Conn != nil {
5050
c.ResetSequence()

server/resp.go

+29-1
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,13 @@ func (c *Conn) writeAuthMoreDataFastAuth() error {
116116
}
117117

118118
func (c *Conn) writeResultset(r *Resultset) error {
119+
// for a streaming resultset, that handled rowdata separately in a callback
120+
// of type SelectPerRowCallback, we can suffice by ending the stream with
121+
// an EOF
122+
if r.StreamingDone {
123+
return c.writeEOF()
124+
}
125+
119126
columnLen := PutLengthEncodedInt(uint64(len(r.Fields)))
120127

121128
data := make([]byte, 4, 1024)
@@ -129,6 +136,12 @@ func (c *Conn) writeResultset(r *Resultset) error {
129136
return err
130137
}
131138

139+
// streaming resultsets handle rowdata in a separate callback of type
140+
// SelectPerRowCallback so we're done here
141+
if r.Streaming {
142+
return nil
143+
}
144+
132145
for _, v := range r.RowDatas {
133146
data = data[0:4]
134147
data = append(data, v...)
@@ -163,10 +176,23 @@ func (c *Conn) writeFieldList(fs []*Field, data []byte) error {
163176
return nil
164177
}
165178

179+
func (c *Conn) writeFieldValues(fv []FieldValue) error {
180+
data := make([]byte, 4, 1024)
181+
for _, v := range fv {
182+
tv, err := FormatTextValue(v.Value())
183+
if err != nil {
184+
return err
185+
}
186+
data = append(data, PutLengthEncodedString(tv)...)
187+
}
188+
189+
return c.WritePacket(data)
190+
}
191+
166192
type noResponse struct{}
167193
type eofResponse struct{}
168194

169-
func (c *Conn) writeValue(value interface{}) error {
195+
func (c *Conn) WriteValue(value interface{}) error {
170196
switch v := value.(type) {
171197
case noResponse:
172198
return nil
@@ -184,6 +210,8 @@ func (c *Conn) writeValue(value interface{}) error {
184210
}
185211
case []*Field:
186212
return c.writeFieldList(v, nil)
213+
case []FieldValue:
214+
return c.writeFieldValues(v)
187215
case *Stmt:
188216
return c.writePrepare(v)
189217
default:

0 commit comments

Comments
 (0)