From 0407fd082e8e265a65eea55069d2cc1d0d603b68 Mon Sep 17 00:00:00 2001 From: atercattus Date: Thu, 8 Apr 2021 09:25:41 +0300 Subject: [PATCH 1/3] renamed resultsetCount param in mysql/NewResultset to more correct fieldsCount and made Resultset.reset public --- mysql/resultset.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/mysql/resultset.go b/mysql/resultset.go index 068d0073f..f244b7d06 100644 --- a/mysql/resultset.go +++ b/mysql/resultset.go @@ -27,9 +27,9 @@ var ( } ) -func NewResultset(resultsetCount int) *Resultset { +func NewResultset(fieldsCount int) *Resultset { r := resultsetPool.Get().(*Resultset) - r.reset(resultsetCount) + r.Reset(fieldsCount) return r } @@ -37,7 +37,7 @@ func (r *Resultset) returnToPool() { resultsetPool.Put(r) } -func (r *Resultset) reset(count int) { +func (r *Resultset) Reset(fieldsCount int) { r.RawPkg = r.RawPkg[:0] r.Fields = r.Fields[:0] @@ -52,14 +52,14 @@ func (r *Resultset) reset(count int) { r.FieldNames = make(map[string]int) } - if count == 0 { + if fieldsCount == 0 { return } - if cap(r.Fields) < count { - r.Fields = make([]*Field, count) + if cap(r.Fields) < fieldsCount { + r.Fields = make([]*Field, fieldsCount) } else { - r.Fields = r.Fields[:count] + r.Fields = r.Fields[:fieldsCount] } } From 41b8fbe2b6a334de2aaa33583f7d2f2c7bcc4e6a Mon Sep 17 00:00:00 2001 From: atercattus Date: Thu, 8 Apr 2021 09:26:29 +0300 Subject: [PATCH 2/3] add client/Conn.ExecuteSelectStreaming() for yet more memory allocation optimization --- client/conn.go | 25 ++++++++++++++ client/resp.go | 88 ++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 113 insertions(+) diff --git a/client/conn.go b/client/conn.go index e26680f16..8229d22b7 100644 --- a/client/conn.go +++ b/client/conn.go @@ -33,6 +33,9 @@ type Conn struct { connectionID uint32 } +// This function will be called for every row in resultset from ExecuteSelectStreaming. +type SelectPerRowCallback func(row []FieldValue) error + func getNetProto(addr string) string { proto := "tcp" if strings.Contains(addr, "/") { @@ -165,6 +168,28 @@ func (c *Conn) Execute(command string, args ...interface{}) (*Result, error) { } } +// ExecuteSelectStreaming will call perRowCallback for every row in resultset +// WITHOUT saving any row data to Result.{Values/RawPkg/RowDatas} fields. +// +// ExecuteSelectStreaming should be used only for SELECT queries with a large response resultset for memory preserving. +// +// Example: +// +// var result mysql.Result +// conn.ExecuteSelectStreaming(`SELECT ... LIMIT 100500`, &result, func(row []mysql.FieldValue) error { +// // Use the row as you want. +// // You must not save FieldValue.AsString() value after this callback is done. Copy it if you need. +// return nil +// }) +// +func (c *Conn) ExecuteSelectStreaming(command string, result *Result, perRowCallback SelectPerRowCallback) error { + if err := c.writeCommandStr(COM_QUERY, command); err != nil { + return errors.Trace(err) + } + + return c.readResultStreaming(false, result, perRowCallback) +} + func (c *Conn) Begin() error { _, err := c.exec("BEGIN") return errors.Trace(err) diff --git a/client/resp.go b/client/resp.go index 392b9fdac..583b80264 100644 --- a/client/resp.go +++ b/client/resp.go @@ -233,6 +233,25 @@ func (c *Conn) readResult(binary bool) (*Result, error) { return c.readResultset(firstPkgBuf, binary) } +func (c *Conn) readResultStreaming(binary bool, result *Result, perRowCb SelectPerRowCallback) error { + firstPkgBuf, err := c.ReadPacketReuseMem(utils.ByteSliceGet(16)[:0]) + defer utils.ByteSlicePut(firstPkgBuf) + + if err != nil { + return errors.Trace(err) + } + + if firstPkgBuf[0] == OK_HEADER { + return ErrMalformPacket // Streaming allowed only for SELECT queries + } else if firstPkgBuf[0] == ERR_HEADER { + return c.handleErrorPacket(append([]byte{}, firstPkgBuf...)) + } else if firstPkgBuf[0] == LocalInFile_HEADER { + return ErrMalformPacket + } + + return c.readResultsetStreaming(firstPkgBuf, binary, result, perRowCb) +} + func (c *Conn) readResultset(data []byte, binary bool) (*Result, error) { // column count count, _, n := LengthEncodedInt(data) @@ -256,6 +275,31 @@ func (c *Conn) readResultset(data []byte, binary bool) (*Result, error) { return result, nil } +func (c *Conn) readResultsetStreaming(data []byte, binary bool, result *Result, perRowCb SelectPerRowCallback) error { + columnCount, _, n := LengthEncodedInt(data) + + if n-len(data) != 0 { + return ErrMalformPacket + } + + if result.Resultset == nil { + result.Resultset = NewResultset(int(columnCount)) + } else { + // Reuse memory if can + result.Reset(int(columnCount)) + } + + if err := c.readResultColumns(result); err != nil { + return errors.Trace(err) + } + + if err := c.readResultRowsStreaming(result, binary, perRowCb); err != nil { + return errors.Trace(err) + } + + return nil +} + func (c *Conn) readResultColumns(result *Result) (err error) { var i int = 0 var data []byte @@ -344,3 +388,47 @@ func (c *Conn) readResultRows(result *Result, isBinary bool) (err error) { return nil } + +func (c *Conn) readResultRowsStreaming(result *Result, isBinary bool, perRowCb SelectPerRowCallback) (err error) { + var ( + data []byte + row []FieldValue + ) + + for { + data, err = c.ReadPacketReuseMem(data[:0]) + if err != nil { + return + } + + // EOF Packet + if c.isEOFPacket(data) { + if c.capability&CLIENT_PROTOCOL_41 > 0 { + // result.Warnings = binary.LittleEndian.Uint16(data[1:]) + // todo add strict_mode, warning will be treat as error + result.Status = binary.LittleEndian.Uint16(data[3:]) + c.status = result.Status + } + + break + } + + if data[0] == ERR_HEADER { + return c.handleErrorPacket(data) + } + + // Parse this row + row, err = RowData(data).Parse(result.Fields, isBinary, row) + if err != nil { + return errors.Trace(err) + } + + // Send the row to "userland" code + err = perRowCb(row) + if err != nil { + return errors.Trace(err) + } + } + + return nil +} From 85c47742fcf58cea76aa473d3fff7e62de1ce054 Mon Sep 17 00:00:00 2001 From: atercattus Date: Thu, 8 Apr 2021 09:28:57 +0300 Subject: [PATCH 3/3] update README.md --- README.md | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/README.md b/README.md index 42717b4b2..c95f4cfae 100644 --- a/README.md +++ b/README.md @@ -197,6 +197,27 @@ Tested MySQL versions for the client include: - 5.7.x - 8.0.x +### Example for SELECT streaming (v.1.1.1) +You can use also streaming for large SELECT responses. +The callback function will be called for every result row without storing the whole resultset in memory. +`result.Fields` will be filled before the first callback call. + +```go +// ... +var result mysql.Result +err := conn.ExecuteSelectStreaming(`select id, name from table LIMIT 100500`, &result, func(row []mysql.FieldValue) error { + for idx, val := range row { + field := result.Fields[idx] + // You must not save FieldValue.AsString() value after this callback is done. + // Copy it if you need. + // ... + } + return false, nil +}) + +// ... +``` + ## Server Server package supplies a framework to implement a simple MySQL server which can handle the packets from the MySQL client.