Skip to content

Commit 2471ea5

Browse files
author
Steven Allen
authored
Merge pull request #14 from libp2p/fix/memory-improvements
combine writes and avoid a few more allocations
2 parents 90cce79 + bde2585 commit 2471ea5

File tree

3 files changed

+33
-25
lines changed

3 files changed

+33
-25
lines changed

chan.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,6 @@ func (s *Chan) ReadFromWithPool(r io.Reader, p *pool.BufferPool) {
3838
// ReadFrom wraps the given io.Reader with a msgio.Reader, reads all
3939
// messages, ands sends them down the channel.
4040
func (s *Chan) readFrom(mr Reader) {
41-
// single reader, no need for Mutex
42-
mr.(*reader).lock = new(nullLocker)
43-
4441
Loop:
4542
for {
4643
buf, err := mr.ReadMsg()
@@ -74,8 +71,6 @@ func (s *Chan) WriteTo(w io.Writer) {
7471
// if bottleneck, cycle around a set of buffers
7572
mw := NewWriter(w)
7673

77-
// single writer, no need for Mutex
78-
mw.(*writer).lock = new(nullLocker)
7974
Loop:
8075
for {
8176
select {

msgio.go

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -76,13 +76,20 @@ type ReadWriteCloser interface {
7676
type writer struct {
7777
W io.Writer
7878

79-
lock sync.Locker
79+
pool *pool.BufferPool
80+
lock sync.Mutex
8081
}
8182

8283
// NewWriter wraps an io.Writer with a msgio framed writer. The msgio.Writer
8384
// will write the length prefix of every message written.
8485
func NewWriter(w io.Writer) WriteCloser {
85-
return &writer{W: w, lock: new(sync.Mutex)}
86+
return NewWriterWithPool(w, pool.GlobalPool)
87+
}
88+
89+
// NewWriterWithPool is identical to NewWriter but allows the user to pass a
90+
// custom buffer pool.
91+
func NewWriterWithPool(w io.Writer, p *pool.BufferPool) WriteCloser {
92+
return &writer{W: w, pool: p}
8693
}
8794

8895
func (s *writer) Write(msg []byte) (int, error) {
@@ -96,10 +103,13 @@ func (s *writer) Write(msg []byte) (int, error) {
96103
func (s *writer) WriteMsg(msg []byte) (err error) {
97104
s.lock.Lock()
98105
defer s.lock.Unlock()
99-
if err := WriteLen(s.W, len(msg)); err != nil {
100-
return err
101-
}
102-
_, err = s.W.Write(msg)
106+
107+
buf := s.pool.Get(len(msg) + lengthSize)
108+
NBO.PutUint32(buf, uint32(len(msg)))
109+
copy(buf[lengthSize:], msg)
110+
_, err = s.W.Write(buf)
111+
s.pool.Put(buf)
112+
103113
return err
104114
}
105115

@@ -114,10 +124,10 @@ func (s *writer) Close() error {
114124
type reader struct {
115125
R io.Reader
116126

117-
lbuf []byte
127+
lbuf [lengthSize]byte
118128
next int
119129
pool *pool.BufferPool
120-
lock sync.Locker
130+
lock sync.Mutex
121131
max int // the maximal message size (in bytes) this reader handles
122132
}
123133

@@ -137,10 +147,8 @@ func NewReaderWithPool(r io.Reader, p *pool.BufferPool) ReadCloser {
137147
}
138148
return &reader{
139149
R: r,
140-
lbuf: make([]byte, lengthSize),
141150
next: -1,
142151
pool: p,
143-
lock: new(sync.Mutex),
144152
max: defaultMaxSize,
145153
}
146154
}
@@ -156,7 +164,7 @@ func (s *reader) NextMsgLen() (int, error) {
156164

157165
func (s *reader) nextMsgLen() (int, error) {
158166
if s.next == -1 {
159-
n, err := ReadLen(s.R, s.lbuf)
167+
n, err := ReadLen(s.R, s.lbuf[:])
160168
if err != nil {
161169
return 0, err
162170
}

varint.go

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,16 +12,21 @@ import (
1212
type varintWriter struct {
1313
W io.Writer
1414

15-
lbuf [binary.MaxVarintLen64]byte // for encoding varints
16-
lock sync.Mutex // for threadsafe writes
15+
pool *pool.BufferPool
16+
lock sync.Mutex // for threadsafe writes
1717
}
1818

1919
// NewVarintWriter wraps an io.Writer with a varint msgio framed writer.
2020
// The msgio.Writer will write the length prefix of every message written
2121
// as a varint, using https://golang.org/pkg/encoding/binary/#PutUvarint
2222
func NewVarintWriter(w io.Writer) WriteCloser {
23+
return NewVarintWriterWithPool(w, pool.GlobalPool)
24+
}
25+
26+
func NewVarintWriterWithPool(w io.Writer, p *pool.BufferPool) WriteCloser {
2327
return &varintWriter{
24-
W: w,
28+
pool: p,
29+
W: w,
2530
}
2631
}
2732

@@ -37,12 +42,12 @@ func (s *varintWriter) WriteMsg(msg []byte) error {
3742
s.lock.Lock()
3843
defer s.lock.Unlock()
3944

40-
length := uint64(len(msg))
41-
n := binary.PutUvarint(s.lbuf[:], length)
42-
if _, err := s.W.Write(s.lbuf[:n]); err != nil {
43-
return err
44-
}
45-
_, err := s.W.Write(msg)
45+
buf := s.pool.Get(len(msg) + binary.MaxVarintLen64)
46+
n := binary.PutUvarint(buf, uint64(len(msg)))
47+
n += copy(buf[n:], msg)
48+
_, err := s.W.Write(buf[:n])
49+
s.pool.Put(buf)
50+
4651
return err
4752
}
4853

0 commit comments

Comments
 (0)