Skip to content

Commit aed7fc4

Browse files
authored
perf: use pooled buffers for message writes (#507)
1 parent 9c56b2d commit aed7fc4

File tree

1 file changed

+13
-7
lines changed

1 file changed

+13
-7
lines changed

comm.go

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
package pubsub
22

33
import (
4-
"bufio"
54
"context"
5+
"encoding/binary"
66
"io"
77
"time"
88

99
"github.com/gogo/protobuf/proto"
10+
pool "github.com/libp2p/go-buffer-pool"
11+
"github.com/multiformats/go-varint"
1012

1113
"github.com/libp2p/go-libp2p/core/network"
1214
"github.com/libp2p/go-libp2p/core/peer"
@@ -156,16 +158,20 @@ func (p *PubSub) handlePeerEOF(ctx context.Context, s network.Stream) {
156158
}
157159

158160
func (p *PubSub) handleSendingMessages(ctx context.Context, s network.Stream, outgoing <-chan *RPC) {
159-
bufw := bufio.NewWriter(s)
160-
wc := protoio.NewDelimitedWriter(bufw)
161+
writeRpc := func(rpc *RPC) error {
162+
size := uint64(rpc.Size())
161163

162-
writeMsg := func(msg proto.Message) error {
163-
err := wc.WriteMsg(msg)
164+
buf := pool.Get(varint.UvarintSize(size) + int(size))
165+
defer pool.Put(buf)
166+
167+
n := binary.PutUvarint(buf, size)
168+
_, err := rpc.MarshalTo(buf[n:])
164169
if err != nil {
165170
return err
166171
}
167172

168-
return bufw.Flush()
173+
_, err = s.Write(buf)
174+
return err
169175
}
170176

171177
defer s.Close()
@@ -176,7 +182,7 @@ func (p *PubSub) handleSendingMessages(ctx context.Context, s network.Stream, ou
176182
return
177183
}
178184

179-
err := writeMsg(&rpc.RPC)
185+
err := writeRpc(rpc)
180186
if err != nil {
181187
s.Reset()
182188
log.Debugf("writing message to %s: %s", s.Conn().RemotePeer(), err)

0 commit comments

Comments
 (0)