Skip to content

Commit 7ae01fd

Browse files
Merge pull request #52 from libp2p/peer-scope
add tests for memory management
2 parents 8480508 + a180f6e commit 7ae01fd

File tree

1 file changed

+128
-13
lines changed

1 file changed

+128
-13
lines changed

suites/mux/muxer_suite.go

Lines changed: 128 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"time"
1919

2020
"github.com/libp2p/go-libp2p-core/network"
21+
"github.com/libp2p/go-libp2p-core/peer"
2122
"github.com/libp2p/go-libp2p-testing/ci"
2223

2324
"github.com/stretchr/testify/require"
@@ -43,6 +44,42 @@ func getFunctionName(i interface{}) string {
4344
return runtime.FuncForPC(reflect.ValueOf(i).Pointer()).Name()
4445
}
4546

47+
type peerScope struct {
48+
mx sync.Mutex
49+
memory int
50+
}
51+
52+
func (p *peerScope) ReserveMemory(size int, _ uint8) error {
53+
fmt.Println("reserving", size)
54+
p.mx.Lock()
55+
p.memory += size
56+
p.mx.Unlock()
57+
return nil
58+
}
59+
60+
func (p *peerScope) ReleaseMemory(size int) {
61+
fmt.Println("releasing", size)
62+
p.mx.Lock()
63+
defer p.mx.Unlock()
64+
if p.memory < size {
65+
panic(fmt.Sprintf("tried to release too much memory: %d (current: %d)", size, p.memory))
66+
}
67+
p.memory -= size
68+
}
69+
70+
// Check checks that we don't have any more reserved memory.
71+
func (p *peerScope) Check(t *testing.T) {
72+
p.mx.Lock()
73+
defer p.mx.Unlock()
74+
require.Zero(t, p.memory, "expected all reserved memory to have been released")
75+
}
76+
77+
func (p *peerScope) Stat() network.ScopeStat { return network.ScopeStat{} }
78+
func (p *peerScope) BeginSpan() (network.ResourceScopeSpan, error) { return nil, nil }
79+
func (p *peerScope) Peer() peer.ID { panic("implement me") }
80+
81+
var _ network.PeerScope = &peerScope{}
82+
4683
type Options struct {
4784
tr network.Multiplexer
4885
connNum int
@@ -141,9 +178,13 @@ func SubtestSimpleWrite(t *testing.T, tr network.Multiplexer) {
141178
defer nc1.Close()
142179

143180
log("wrapping conn")
144-
c1, err := tr.NewConn(nc1, false, nil)
181+
scope := &peerScope{}
182+
c1, err := tr.NewConn(nc1, false, scope)
145183
checkErr(t, err)
146-
defer c1.Close()
184+
defer func() {
185+
c1.Close()
186+
scope.Check(t)
187+
}()
147188

148189
// serve the outgoing conn, because some muxers assume
149190
// that we _always_ call serve. (this is an error?)
@@ -253,7 +294,8 @@ func SubtestStress(t *testing.T, opt Options) {
253294
return
254295
}
255296

256-
c, err := opt.tr.NewConn(nc, false, nil)
297+
scope := &peerScope{}
298+
c, err := opt.tr.NewConn(nc, false, scope)
257299
if err != nil {
258300
t.Fatal(fmt.Errorf("a.AddConn(%s <--> %s): %s", nc.LocalAddr(), nc.RemoteAddr(), err))
259301
return
@@ -282,6 +324,7 @@ func SubtestStress(t *testing.T, opt Options) {
282324
}
283325
wg.Wait()
284326
c.Close()
327+
scope.Check(t)
285328
}
286329

287330
openConnsAndRW := func() {
@@ -375,10 +418,15 @@ func SubtestStreamOpenStress(t *testing.T, tr network.Multiplexer) {
375418
}
376419
}()
377420

378-
muxb, err := tr.NewConn(b, false, nil)
421+
scope := &peerScope{}
422+
muxb, err := tr.NewConn(b, false, scope)
379423
if err != nil {
380424
t.Fatal(err)
381425
}
426+
defer func() {
427+
muxb.Close()
428+
scope.Check(t)
429+
}()
382430

383431
time.Sleep(time.Millisecond * 50)
384432

@@ -391,7 +439,9 @@ func SubtestStreamOpenStress(t *testing.T, tr network.Multiplexer) {
391439
if err != nil {
392440
break
393441
}
442+
wg.Add(1)
394443
go func() {
444+
defer wg.Done()
395445
str.Close()
396446
select {
397447
case recv <- struct{}{}:
@@ -415,6 +465,8 @@ func SubtestStreamOpenStress(t *testing.T, tr network.Multiplexer) {
415465
t.Fatal("timed out receiving streams")
416466
}
417467
}
468+
469+
wg.Wait()
418470
}
419471

420472
func SubtestStreamReset(t *testing.T, tr network.Multiplexer) {
@@ -428,7 +480,8 @@ func SubtestStreamReset(t *testing.T, tr network.Multiplexer) {
428480
wg.Add(1)
429481
go func() {
430482
defer wg.Done()
431-
muxa, err := tr.NewConn(a, true, nil)
483+
scope := &peerScope{}
484+
muxa, err := tr.NewConn(a, true, scope)
432485
if err != nil {
433486
t.Error(err)
434487
return
@@ -444,14 +497,19 @@ func SubtestStreamReset(t *testing.T, tr network.Multiplexer) {
444497
if err != network.ErrReset {
445498
t.Error("should have been stream reset")
446499
}
447-
448500
s.Close()
501+
scope.Check(t)
449502
}()
450503

451-
muxb, err := tr.NewConn(b, false, nil)
504+
scope := &peerScope{}
505+
muxb, err := tr.NewConn(b, false, scope)
452506
if err != nil {
453507
t.Fatal(err)
454508
}
509+
defer func() {
510+
muxb.Close()
511+
scope.Check(t)
512+
}()
455513

456514
str, err := muxb.AcceptStream()
457515
checkErr(t, err)
@@ -464,16 +522,18 @@ func SubtestStreamReset(t *testing.T, tr network.Multiplexer) {
464522
func SubtestWriteAfterClose(t *testing.T, tr network.Multiplexer) {
465523
a, b := tcpPipe(t)
466524

467-
muxa, err := tr.NewConn(a, true, nil)
525+
scopea := &peerScope{}
526+
muxa, err := tr.NewConn(a, true, scopea)
468527
checkErr(t, err)
469528

470-
muxb, err := tr.NewConn(b, false, nil)
529+
scopeb := &peerScope{}
530+
muxb, err := tr.NewConn(b, false, scopeb)
471531
checkErr(t, err)
472532

473-
err = muxa.Close()
474-
checkErr(t, err)
475-
err = muxb.Close()
476-
checkErr(t, err)
533+
checkErr(t, muxa.Close())
534+
scopea.Check(t)
535+
checkErr(t, muxb.Close())
536+
scopeb.Check(t)
477537

478538
// make sure the underlying net.Conn was closed
479539
if _, err := a.Write([]byte("foobar")); err == nil || !strings.Contains(err.Error(), "use of closed network connection") {
@@ -484,6 +544,60 @@ func SubtestWriteAfterClose(t *testing.T, tr network.Multiplexer) {
484544
}
485545
}
486546

547+
func SubtestStreamLeftOpen(t *testing.T, tr network.Multiplexer) {
548+
a, b := tcpPipe(t)
549+
550+
const numStreams = 10
551+
const dataLen = 50 * 1024
552+
553+
scopea := &peerScope{}
554+
muxa, err := tr.NewConn(a, true, scopea)
555+
checkErr(t, err)
556+
557+
scopeb := &peerScope{}
558+
muxb, err := tr.NewConn(b, false, scopeb)
559+
checkErr(t, err)
560+
561+
var wg sync.WaitGroup
562+
wg.Add(1 + numStreams)
563+
go func() {
564+
defer wg.Done()
565+
for i := 0; i < numStreams; i++ {
566+
stra, err := muxa.OpenStream(context.Background())
567+
checkErr(t, err)
568+
go func() {
569+
defer wg.Done()
570+
_, err = stra.Write(randBuf(dataLen))
571+
checkErr(t, err)
572+
// do NOT close or reset the stream
573+
}()
574+
}
575+
}()
576+
577+
wg.Add(1 + numStreams)
578+
go func() {
579+
defer wg.Done()
580+
for i := 0; i < numStreams; i++ {
581+
str, err := muxb.AcceptStream()
582+
checkErr(t, err)
583+
go func() {
584+
defer wg.Done()
585+
_, err = io.ReadFull(str, make([]byte, dataLen))
586+
checkErr(t, err)
587+
}()
588+
}
589+
}()
590+
591+
// Now we have a bunch of open streams.
592+
// Make sure that their memory is returned when we close the connection.
593+
wg.Wait()
594+
595+
muxa.Close()
596+
scopea.Check(t)
597+
muxb.Close()
598+
scopeb.Check(t)
599+
}
600+
487601
func SubtestStress1Conn1Stream1Msg(t *testing.T, tr network.Multiplexer) {
488602
SubtestStress(t, Options{
489603
tr: tr,
@@ -562,6 +676,7 @@ var subtests = []TransportTest{
562676
SubtestStress1Conn100Stream100Msg10MB,
563677
SubtestStreamOpenStress,
564678
SubtestStreamReset,
679+
SubtestStreamLeftOpen,
565680
}
566681

567682
// SubtestAll runs all the stream multiplexer tests against the target

0 commit comments

Comments
 (0)