@@ -545,6 +545,60 @@ func SubtestWriteAfterClose(t *testing.T, tr network.Multiplexer) {
545
545
}
546
546
}
547
547
548
+ func SubtestStreamLeftOpen (t * testing.T , tr network.Multiplexer ) {
549
+ a , b := tcpPipe (t )
550
+
551
+ const numStreams = 10
552
+ const dataLen = 50 * 1024
553
+
554
+ scopea := & peerScope {}
555
+ muxa , err := tr .NewConn (a , true , scopea )
556
+ checkErr (t , err )
557
+
558
+ scopeb := & peerScope {}
559
+ muxb , err := tr .NewConn (b , false , scopeb )
560
+ checkErr (t , err )
561
+
562
+ var wg sync.WaitGroup
563
+ wg .Add (1 + numStreams )
564
+ go func () {
565
+ defer wg .Done ()
566
+ for i := 0 ; i < numStreams ; i ++ {
567
+ stra , err := muxa .OpenStream (context .Background ())
568
+ checkErr (t , err )
569
+ go func () {
570
+ defer wg .Done ()
571
+ _ , err = stra .Write (randBuf (dataLen ))
572
+ checkErr (t , err )
573
+ // do NOT close or reset the stream
574
+ }()
575
+ }
576
+ }()
577
+
578
+ wg .Add (1 + numStreams )
579
+ go func () {
580
+ defer wg .Done ()
581
+ for i := 0 ; i < numStreams ; i ++ {
582
+ str , err := muxb .AcceptStream ()
583
+ checkErr (t , err )
584
+ go func () {
585
+ defer wg .Done ()
586
+ _ , err = io .ReadFull (str , make ([]byte , dataLen ))
587
+ checkErr (t , err )
588
+ }()
589
+ }
590
+ }()
591
+
592
+ // Now we have a bunch of open streams.
593
+ // Make sure that their memory is returned when we close the connection.
594
+ wg .Wait ()
595
+
596
+ muxa .Close ()
597
+ scopea .Check (t )
598
+ muxb .Close ()
599
+ scopeb .Check (t )
600
+ }
601
+
548
602
func SubtestStress1Conn1Stream1Msg (t * testing.T , tr network.Multiplexer ) {
549
603
SubtestStress (t , Options {
550
604
tr : tr ,
@@ -623,6 +677,7 @@ var subtests = []TransportTest{
623
677
SubtestStress1Conn100Stream100Msg10MB ,
624
678
SubtestStreamOpenStress ,
625
679
SubtestStreamReset ,
680
+ SubtestStreamLeftOpen ,
626
681
}
627
682
628
683
// SubtestAll runs all the stream multiplexer tests against the target
0 commit comments