4
4
"context"
5
5
"crypto/rand"
6
6
"encoding/hex"
7
+ "errors"
7
8
"fmt"
8
9
"io"
9
10
"net"
@@ -17,6 +18,7 @@ import (
17
18
"github.com/libp2p/go-libp2p/core/crypto"
18
19
"github.com/libp2p/go-libp2p/core/network"
19
20
"github.com/libp2p/go-libp2p/core/peer"
21
+ tpt "github.com/libp2p/go-libp2p/core/transport"
20
22
ma "github.com/multiformats/go-multiaddr"
21
23
manet "github.com/multiformats/go-multiaddr/net"
22
24
"github.com/multiformats/go-multibase"
@@ -867,3 +869,118 @@ func TestGenUfrag(t *testing.T) {
867
869
require .True (t , strings .HasPrefix (s , "libp2p+webrtc+v1/" ))
868
870
}
869
871
}
872
+
873
+ func TestManyConnections (t * testing.T ) {
874
+ var listeners []tpt.Listener
875
+ var listenerPeerIDs []peer.ID
876
+
877
+ const numListeners = 5
878
+ const dialersPerListener = 5
879
+ const connsPerDialer = 10
880
+ errCh := make (chan error , 10 * numListeners * dialersPerListener * connsPerDialer )
881
+ successCh := make (chan struct {}, 10 * numListeners * dialersPerListener * connsPerDialer )
882
+
883
+ for i := 0 ; i < numListeners ; i ++ {
884
+ tr , lp := getTransport (t )
885
+ listenerPeerIDs = append (listenerPeerIDs , lp )
886
+ ln , err := tr .Listen (ma .StringCast ("/ip4/127.0.0.1/udp/0/webrtc-direct" ))
887
+ require .NoError (t , err )
888
+ defer ln .Close ()
889
+ listeners = append (listeners , ln )
890
+ }
891
+
892
+ runListenConn := func (conn tpt.CapableConn ) {
893
+ defer conn .Close ()
894
+ s , err := conn .AcceptStream ()
895
+ if err != nil {
896
+ t .Errorf ("accept stream failed for listener: %s" , err )
897
+ errCh <- err
898
+ return
899
+ }
900
+ var b [4 ]byte
901
+ if _ , err := s .Read (b [:]); err != nil {
902
+ t .Errorf ("read stream failed for listener: %s" , err )
903
+ errCh <- err
904
+ return
905
+ }
906
+ s .Write (b [:])
907
+ _ , err = s .Read (b [:]) // peer will close the connection after read
908
+ if ! assert .Error (t , err ) {
909
+ err = errors .New ("invalid read: expected conn to close" )
910
+ errCh <- err
911
+ return
912
+ }
913
+ successCh <- struct {}{}
914
+ }
915
+
916
+ runDialConn := func (conn tpt.CapableConn ) {
917
+ defer conn .Close ()
918
+
919
+ s , err := conn .OpenStream (context .Background ())
920
+ if err != nil {
921
+ t .Errorf ("accept stream failed for listener: %s" , err )
922
+ errCh <- err
923
+ return
924
+ }
925
+ var b [4 ]byte
926
+ if _ , err := s .Write (b [:]); err != nil {
927
+ t .Errorf ("write stream failed for dialer: %s" , err )
928
+ errCh <- err
929
+ return
930
+ }
931
+ if _ , err := s .Read (b [:]); err != nil {
932
+ t .Errorf ("read stream failed for dialer: %s" , err )
933
+ errCh <- err
934
+ return
935
+ }
936
+ s .Close ()
937
+ }
938
+
939
+ runListener := func (ln tpt.Listener ) {
940
+ for i := 0 ; i < dialersPerListener * connsPerDialer ; i ++ {
941
+ conn , err := ln .Accept ()
942
+ if err != nil {
943
+ t .Errorf ("listener failed to accept conneciton: %s" , err )
944
+ return
945
+ }
946
+ go runListenConn (conn )
947
+ }
948
+ }
949
+
950
+ runDialer := func (ln tpt.Listener , lp peer.ID ) {
951
+ tp , _ := getTransport (t )
952
+ for i := 0 ; i < connsPerDialer ; i ++ {
953
+ // We want to test for deadlocks, set a high timeout
954
+ ctx , cancel := context .WithTimeout (context .Background (), 120 * time .Second )
955
+ conn , err := tp .Dial (ctx , ln .Multiaddr (), lp )
956
+ if err != nil {
957
+ t .Errorf ("dial failed: %s" , err )
958
+ errCh <- err
959
+ cancel ()
960
+ return
961
+ }
962
+ runDialConn (conn )
963
+ cancel ()
964
+ }
965
+ }
966
+
967
+ for i := 0 ; i < numListeners ; i ++ {
968
+ go runListener (listeners [i ])
969
+ }
970
+ for i := 0 ; i < numListeners ; i ++ {
971
+ for j := 0 ; j < dialersPerListener ; j ++ {
972
+ go runDialer (listeners [i ], listenerPeerIDs [i ])
973
+ }
974
+ }
975
+
976
+ for i := 0 ; i < numListeners * dialersPerListener * connsPerDialer ; i ++ {
977
+ select {
978
+ case <- successCh :
979
+ t .Log ("completed conn: " , i )
980
+ case err := <- errCh :
981
+ t .Fatalf ("failed: %s" , err )
982
+ case <- time .After (300 * time .Second ):
983
+ t .Fatalf ("timed out" )
984
+ }
985
+ }
986
+ }
0 commit comments