Skip to content

Commit 24f7760

Browse files
committed
webrtc: add a stress test for connection creation
1 parent 1e2adf3 commit 24f7760

File tree

2 files changed

+108
-2
lines changed

2 files changed

+108
-2
lines changed

p2p/transport/webrtc/listener.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -329,8 +329,8 @@ func (l *listener) Multiaddr() ma.Multiaddr {
329329
func addOnConnectionStateChangeCallback(pc *webrtc.PeerConnection) <-chan error {
330330
errC := make(chan error, 1)
331331
var once sync.Once
332-
pc.OnConnectionStateChange(func(state webrtc.PeerConnectionState) {
333-
switch state {
332+
pc.OnConnectionStateChange(func(_ webrtc.PeerConnectionState) {
333+
switch pc.ConnectionState() {
334334
case webrtc.PeerConnectionStateConnected:
335335
once.Do(func() { close(errC) })
336336
case webrtc.PeerConnectionStateFailed:

p2p/transport/webrtc/transport_test.go

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"github.com/libp2p/go-libp2p/core/crypto"
1818
"github.com/libp2p/go-libp2p/core/network"
1919
"github.com/libp2p/go-libp2p/core/peer"
20+
tpt "github.com/libp2p/go-libp2p/core/transport"
2021
ma "github.com/multiformats/go-multiaddr"
2122
manet "github.com/multiformats/go-multiaddr/net"
2223
"github.com/multiformats/go-multibase"
@@ -860,3 +861,108 @@ func TestMaxInFlightRequests(t *testing.T) {
860861
require.Equal(t, count, int(success.Load()), "expected exactly 3 dial successes")
861862
require.Equal(t, 1, int(fails.Load()), "expected exactly 1 dial failure")
862863
}
864+
865+
func TestStressConnectionCreationNoDeadlock(t *testing.T) {
866+
var listeners []tpt.Listener
867+
var listenerPeerIDs []peer.ID
868+
869+
const numListeners = 10
870+
const dialersPerListener = 10
871+
const connsPerDialer = 3
872+
errCh := make(chan error, 10*numListeners*dialersPerListener*connsPerDialer)
873+
successCh := make(chan struct{}, 10*numListeners*dialersPerListener*connsPerDialer)
874+
875+
for i := 0; i < numListeners; i++ {
876+
tr, lp := getTransport(t)
877+
listenerPeerIDs = append(listenerPeerIDs, lp)
878+
ln, err := tr.Listen(ma.StringCast("/ip4/127.0.0.1/udp/0/webrtc-direct"))
879+
require.NoError(t, err)
880+
defer ln.Close()
881+
listeners = append(listeners, ln)
882+
}
883+
884+
runListenConn := func(conn tpt.CapableConn) {
885+
s, err := conn.AcceptStream()
886+
if err != nil {
887+
t.Errorf("accept stream failed for listener: %s", err)
888+
errCh <- err
889+
return
890+
}
891+
var b [4]byte
892+
if _, err := s.Read(b[:]); err != nil {
893+
t.Errorf("read stream failed for listener: %s", err)
894+
errCh <- err
895+
return
896+
}
897+
s.Write(b[:])
898+
s.Read(b[:]) // peer will close the connection after read
899+
successCh <- struct{}{}
900+
}
901+
902+
runDialConn := func(conn tpt.CapableConn) {
903+
s, err := conn.OpenStream(context.Background())
904+
if err != nil {
905+
t.Errorf("accept stream failed for listener: %s", err)
906+
errCh <- err
907+
return
908+
}
909+
var b [4]byte
910+
if _, err := s.Write(b[:]); err != nil {
911+
t.Errorf("write stream failed for dialer: %s", err)
912+
}
913+
if _, err := s.Read(b[:]); err != nil {
914+
t.Errorf("read stream failed for dialer: %s", err)
915+
errCh <- err
916+
return
917+
}
918+
s.Close()
919+
}
920+
921+
runListener := func(ln tpt.Listener) {
922+
for i := 0; i < dialersPerListener*connsPerDialer; i++ {
923+
conn, err := ln.Accept()
924+
if err != nil {
925+
t.Errorf("listener failed to accept conneciton: %s", err)
926+
return
927+
}
928+
go runListenConn(conn)
929+
}
930+
}
931+
932+
runDialer := func(ln tpt.Listener, lp peer.ID) {
933+
tp, _ := getTransport(t)
934+
for i := 0; i < connsPerDialer; i++ {
935+
// This test aims to check for deadlocks. So keep a high timeout
936+
ctx, cancel := context.WithTimeout(context.Background(), 300*time.Second)
937+
conn, err := tp.Dial(ctx, ln.Multiaddr(), lp)
938+
if err != nil {
939+
t.Errorf("dial failed: %s", err)
940+
errCh <- err
941+
cancel()
942+
return
943+
}
944+
runDialConn(conn)
945+
cancel()
946+
}
947+
}
948+
949+
for i := 0; i < numListeners; i++ {
950+
go runListener(listeners[i])
951+
}
952+
for i := 0; i < numListeners; i++ {
953+
for j := 0; j < dialersPerListener; j++ {
954+
go runDialer(listeners[i], listenerPeerIDs[i])
955+
}
956+
}
957+
958+
for i := 0; i < numListeners*dialersPerListener*connsPerDialer; i++ {
959+
select {
960+
case <-successCh:
961+
t.Log(i)
962+
case err := <-errCh:
963+
t.Fatalf("failed: %s", err)
964+
case <-time.After(300 * time.Second):
965+
t.Fatalf("timed out")
966+
}
967+
}
968+
}

0 commit comments

Comments
 (0)