Skip to content

Commit ad84c06

Browse files
committed
webrtc: add a stress test for many connections
1 parent 94b97b1 commit ad84c06

File tree

3 files changed

+125
-11
lines changed

3 files changed

+125
-11
lines changed

p2p/transport/webrtc/listener.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -329,8 +329,8 @@ func (l *listener) Multiaddr() ma.Multiaddr {
329329
func addOnConnectionStateChangeCallback(pc *webrtc.PeerConnection) <-chan error {
330330
errC := make(chan error, 1)
331331
var once sync.Once
332-
pc.OnConnectionStateChange(func(state webrtc.PeerConnectionState) {
333-
switch state {
332+
pc.OnConnectionStateChange(func(_ webrtc.PeerConnectionState) {
333+
switch pc.ConnectionState() {
334334
case webrtc.PeerConnectionStateConnected:
335335
once.Do(func() { close(errC) })
336336
case webrtc.PeerConnectionStateFailed:

p2p/transport/webrtc/transport.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -73,9 +73,9 @@ const (
7373
// timeout values for the peerconnection
7474
// https://github.com/pion/webrtc/blob/v3.1.50/settingengine.go#L102-L109
7575
const (
76-
DefaultDisconnectedTimeout = 20 * time.Second
77-
DefaultFailedTimeout = 30 * time.Second
78-
DefaultKeepaliveTimeout = 15 * time.Second
76+
DefaultDisconnectedTimeout = 100 * time.Second
77+
DefaultFailedTimeout = 300 * time.Second
78+
DefaultKeepaliveTimeout = 50 * time.Second
7979

8080
sctpReceiveBufferSize = 100_000
8181
)

p2p/transport/webrtc/transport_test.go

Lines changed: 120 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"github.com/libp2p/go-libp2p/core/crypto"
1818
"github.com/libp2p/go-libp2p/core/network"
1919
"github.com/libp2p/go-libp2p/core/peer"
20+
tpt "github.com/libp2p/go-libp2p/core/transport"
2021
ma "github.com/multiformats/go-multiaddr"
2122
manet "github.com/multiformats/go-multiaddr/net"
2223
"github.com/multiformats/go-multibase"
@@ -751,8 +752,8 @@ func TestTransportWebRTC_PeerConnectionDTLSFailed(t *testing.T) {
751752

752753
func TestConnectionTimeoutOnListener(t *testing.T) {
753754
tr, listeningPeer := getTransport(t)
754-
tr.peerConnectionTimeouts.Disconnect = 100 * time.Millisecond
755-
tr.peerConnectionTimeouts.Failed = 150 * time.Millisecond
755+
tr.peerConnectionTimeouts.Disconnect = 300 * time.Millisecond
756+
tr.peerConnectionTimeouts.Failed = 500 * time.Millisecond
756757
tr.peerConnectionTimeouts.Keepalive = 50 * time.Millisecond
757758

758759
listenMultiaddr := ma.StringCast("/ip4/127.0.0.1/udp/0/webrtc-direct")
@@ -770,17 +771,25 @@ func TestConnectionTimeoutOnListener(t *testing.T) {
770771

771772
tr1, connectingPeer := getTransport(t)
772773
go func() {
773-
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
774+
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
774775
defer cancel()
775776
addr, err := manet.FromNetAddr(proxy.LocalAddr())
776-
require.NoError(t, err)
777+
if !assert.NoError(t, err) {
778+
ln.Close()
779+
return
780+
}
777781
_, webrtcComponent := ma.SplitFunc(ln.Multiaddr(), func(c ma.Component) bool { return c.Protocol().Code == ma.P_WEBRTC_DIRECT })
778782
addr = addr.Encapsulate(webrtcComponent)
779783
conn, err := tr1.Dial(ctx, addr, listeningPeer)
780-
require.NoError(t, err)
784+
if !assert.NoError(t, err) {
785+
ln.Close()
786+
return
787+
}
781788
t.Cleanup(func() { conn.Close() })
782789
str, err := conn.OpenStream(ctx)
783-
require.NoError(t, err)
790+
if !assert.NoError(t, err) {
791+
return
792+
}
784793
str.Write([]byte("foobar"))
785794
}()
786795

@@ -860,3 +869,108 @@ func TestMaxInFlightRequests(t *testing.T) {
860869
require.Equal(t, count, int(success.Load()), "expected exactly 3 dial successes")
861870
require.Equal(t, 1, int(fails.Load()), "expected exactly 1 dial failure")
862871
}
872+
873+
func TestStressConnectionCreation(t *testing.T) {
874+
var listeners []tpt.Listener
875+
var listenerPeerIDs []peer.ID
876+
877+
const numListeners = 10
878+
const dialersPerListener = 10
879+
const connsPerDialer = 10
880+
errCh := make(chan error, 10*numListeners*dialersPerListener*connsPerDialer)
881+
successCh := make(chan struct{}, 10*numListeners*dialersPerListener*connsPerDialer)
882+
883+
for i := 0; i < numListeners; i++ {
884+
tr, lp := getTransport(t)
885+
listenerPeerIDs = append(listenerPeerIDs, lp)
886+
ln, err := tr.Listen(ma.StringCast("/ip4/127.0.0.1/udp/0/webrtc-direct"))
887+
require.NoError(t, err)
888+
defer ln.Close()
889+
listeners = append(listeners, ln)
890+
}
891+
892+
runListenConn := func(conn tpt.CapableConn) {
893+
s, err := conn.AcceptStream()
894+
if err != nil {
895+
t.Errorf("accept stream failed for listener: %s", err)
896+
errCh <- err
897+
return
898+
}
899+
var b [4]byte
900+
if _, err := s.Read(b[:]); err != nil {
901+
t.Errorf("read stream failed for listener: %s", err)
902+
errCh <- err
903+
return
904+
}
905+
s.Write(b[:])
906+
s.Read(b[:]) // peer will close the connection after read
907+
successCh <- struct{}{}
908+
}
909+
910+
runDialConn := func(conn tpt.CapableConn) {
911+
s, err := conn.OpenStream(context.Background())
912+
if err != nil {
913+
t.Errorf("accept stream failed for listener: %s", err)
914+
errCh <- err
915+
return
916+
}
917+
var b [4]byte
918+
if _, err := s.Write(b[:]); err != nil {
919+
t.Errorf("write stream failed for dialer: %s", err)
920+
}
921+
if _, err := s.Read(b[:]); err != nil {
922+
t.Errorf("read stream failed for dialer: %s", err)
923+
errCh <- err
924+
return
925+
}
926+
s.Close()
927+
}
928+
929+
runListener := func(ln tpt.Listener) {
930+
for i := 0; i < dialersPerListener*connsPerDialer; i++ {
931+
conn, err := ln.Accept()
932+
if err != nil {
933+
t.Errorf("listener failed to accept conneciton: %s", err)
934+
return
935+
}
936+
go runListenConn(conn)
937+
}
938+
}
939+
940+
runDialer := func(ln tpt.Listener, lp peer.ID) {
941+
tp, _ := getTransport(t)
942+
for i := 0; i < connsPerDialer; i++ {
943+
ctx, cancel := context.WithTimeout(context.Background(), 300*time.Second)
944+
conn, err := tp.Dial(ctx, ln.Multiaddr(), lp)
945+
if err != nil {
946+
t.Errorf("dial failed: %s", err)
947+
errCh <- err
948+
cancel()
949+
return
950+
}
951+
go runDialConn(conn)
952+
cancel()
953+
}
954+
}
955+
956+
for i := 0; i < numListeners; i++ {
957+
go runListener(listeners[i])
958+
}
959+
time.Sleep(10 * time.Second)
960+
for i := 0; i < numListeners; i++ {
961+
for j := 0; j < dialersPerListener; j++ {
962+
go runDialer(listeners[i], listenerPeerIDs[i])
963+
}
964+
}
965+
966+
for i := 0; i < numListeners*dialersPerListener*connsPerDialer; i++ {
967+
select {
968+
case <-successCh:
969+
fmt.Println(i)
970+
case err := <-errCh:
971+
t.Fatalf("failed: %s", err)
972+
case <-time.After(300 * time.Second):
973+
t.Fatalf("timed out")
974+
}
975+
}
976+
}

0 commit comments

Comments
 (0)