Skip to content
This repository was archived by the owner on Feb 1, 2023. It is now read-only.

Commit 5628965

Browse files
authored
Merge pull request #143 from ipfs/fix/free-memory
aggressively free memory
2 parents 008c693 + 9bf38f7 commit 5628965

File tree

4 files changed

+42
-18
lines changed

4 files changed

+42
-18
lines changed

go.mod

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,13 @@ require (
2222
github.com/ipfs/go-metrics-interface v0.0.1
2323
github.com/ipfs/go-peertaskqueue v0.1.1
2424
github.com/jbenet/goprocess v0.1.3
25+
github.com/libp2p/go-buffer-pool v0.0.2
2526
github.com/libp2p/go-libp2p v0.1.1
2627
github.com/libp2p/go-libp2p-core v0.0.3
2728
github.com/libp2p/go-libp2p-loggables v0.1.0
2829
github.com/libp2p/go-libp2p-netutil v0.1.0
2930
github.com/libp2p/go-libp2p-testing v0.0.4
30-
github.com/libp2p/go-msgio v0.0.3 // indirect
31+
github.com/libp2p/go-msgio v0.0.4
3132
github.com/mattn/go-colorable v0.1.2 // indirect
3233
github.com/multiformats/go-multiaddr v0.0.4
3334
github.com/opentracing/opentracing-go v1.1.0 // indirect

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -185,8 +185,8 @@ github.com/libp2p/go-mplex v0.1.0 h1:/nBTy5+1yRyY82YaO6HXQRnO5IAGsXTjEJaR3LdTPc0
185185
github.com/libp2p/go-mplex v0.1.0/go.mod h1:SXgmdki2kwCUlCCbfGLEgHjC4pFqhTp0ZoV6aiKgxDU=
186186
github.com/libp2p/go-msgio v0.0.2 h1:ivPvEKHxmVkTClHzg6RXTYHqaJQ0V9cDbq+6lKb3UV0=
187187
github.com/libp2p/go-msgio v0.0.2/go.mod h1:63lBBgOTDKQL6EWazRMCwXsEeEeK9O2Cd+0+6OOuipQ=
188-
github.com/libp2p/go-msgio v0.0.3 h1:VsOlWispTivSsOMg70e0W77y6oiSBSRCyP6URrWvE04=
189-
github.com/libp2p/go-msgio v0.0.3/go.mod h1:63lBBgOTDKQL6EWazRMCwXsEeEeK9O2Cd+0+6OOuipQ=
188+
github.com/libp2p/go-msgio v0.0.4 h1:agEFehY3zWJFUHK6SEMR7UYmk2z6kC3oeCM7ybLhguA=
189+
github.com/libp2p/go-msgio v0.0.4/go.mod h1:63lBBgOTDKQL6EWazRMCwXsEeEeK9O2Cd+0+6OOuipQ=
190190
github.com/libp2p/go-nat v0.0.3 h1:l6fKV+p0Xa354EqQOQP+d8CivdLM4kl5GxC1hSc/UeI=
191191
github.com/libp2p/go-nat v0.0.3/go.mod h1:88nUEt0k0JD45Bk93NIwDqjlhiOwOoV36GchpcVc1yI=
192192
github.com/libp2p/go-reuseport v0.0.1 h1:7PhkfH73VXfPJYKQ6JwS5I/eVcoyYi9IMNGc6FWpFLw=

message/message.go

Lines changed: 35 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,17 @@
11
package message
22

33
import (
4+
"encoding/binary"
45
"fmt"
56
"io"
67

78
pb "github.com/ipfs/go-bitswap/message/pb"
89
wantlist "github.com/ipfs/go-bitswap/wantlist"
910
blocks "github.com/ipfs/go-block-format"
1011

11-
ggio "github.com/gogo/protobuf/io"
1212
cid "github.com/ipfs/go-cid"
13+
pool "github.com/libp2p/go-buffer-pool"
14+
msgio "github.com/libp2p/go-msgio"
1315

1416
"github.com/libp2p/go-libp2p-core/network"
1517
)
@@ -170,18 +172,25 @@ func (m *impl) AddBlock(b blocks.Block) {
170172

171173
// FromNet generates a new BitswapMessage from incoming data on an io.Reader.
172174
func FromNet(r io.Reader) (BitSwapMessage, error) {
173-
pbr := ggio.NewDelimitedReader(r, network.MessageSizeMax)
174-
return FromPBReader(pbr)
175+
reader := msgio.NewVarintReaderSize(r, network.MessageSizeMax)
176+
return FromMsgReader(reader)
175177
}
176178

177179
// FromPBReader generates a new Bitswap message from a gogo-protobuf reader
178-
func FromPBReader(pbr ggio.Reader) (BitSwapMessage, error) {
179-
pb := new(pb.Message)
180-
if err := pbr.ReadMsg(pb); err != nil {
180+
func FromMsgReader(r msgio.Reader) (BitSwapMessage, error) {
181+
msg, err := r.ReadMsg()
182+
if err != nil {
181183
return nil, err
182184
}
183185

184-
return newMessageFromProto(*pb)
186+
var pb pb.Message
187+
err = pb.Unmarshal(msg)
188+
r.ReleaseMsg(msg)
189+
if err != nil {
190+
return nil, err
191+
}
192+
193+
return newMessageFromProto(pb)
185194
}
186195

187196
func (m *impl) ToProtoV0() *pb.Message {
@@ -228,15 +237,29 @@ func (m *impl) ToProtoV1() *pb.Message {
228237
}
229238

230239
func (m *impl) ToNetV0(w io.Writer) error {
231-
pbw := ggio.NewDelimitedWriter(w)
232-
233-
return pbw.WriteMsg(m.ToProtoV0())
240+
return write(w, m.ToProtoV0())
234241
}
235242

236243
func (m *impl) ToNetV1(w io.Writer) error {
237-
pbw := ggio.NewDelimitedWriter(w)
244+
return write(w, m.ToProtoV1())
245+
}
246+
247+
func write(w io.Writer, m *pb.Message) error {
248+
size := m.Size()
249+
250+
buf := pool.Get(size + binary.MaxVarintLen64)
251+
defer pool.Put(buf)
252+
253+
n := binary.PutUvarint(buf, uint64(size))
254+
255+
written, err := m.MarshalTo(buf[n:])
256+
if err != nil {
257+
return err
258+
}
259+
n += written
238260

239-
return pbw.WriteMsg(m.ToProtoV1())
261+
_, err = w.Write(buf[:n])
262+
return err
240263
}
241264

242265
func (m *impl) Loggable() map[string]interface{} {

network/ipfs_impl.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import (
1010
bsmsg "github.com/ipfs/go-bitswap/message"
1111
"github.com/libp2p/go-libp2p-core/helpers"
1212

13-
ggio "github.com/gogo/protobuf/io"
1413
cid "github.com/ipfs/go-cid"
1514
logging "github.com/ipfs/go-log"
1615
"github.com/libp2p/go-libp2p-core/connmgr"
@@ -19,6 +18,7 @@ import (
1918
"github.com/libp2p/go-libp2p-core/peer"
2019
peerstore "github.com/libp2p/go-libp2p-core/peerstore"
2120
"github.com/libp2p/go-libp2p-core/routing"
21+
msgio "github.com/libp2p/go-msgio"
2222
ma "github.com/multiformats/go-multiaddr"
2323
)
2424

@@ -178,9 +178,9 @@ func (bsnet *impl) handleNewStream(s network.Stream) {
178178
return
179179
}
180180

181-
reader := ggio.NewDelimitedReader(s, network.MessageSizeMax)
181+
reader := msgio.NewVarintReaderSize(s, network.MessageSizeMax)
182182
for {
183-
received, err := bsmsg.FromPBReader(reader)
183+
received, err := bsmsg.FromMsgReader(reader)
184184
if err != nil {
185185
if err != io.EOF {
186186
s.Reset()

0 commit comments

Comments
 (0)