From e108b396237d359307c30f7e42197af00992ed6b Mon Sep 17 00:00:00 2001 From: Topper Bowers Date: Wed, 21 Aug 2019 15:16:17 +0200 Subject: [PATCH 1/5] use the manet dial args in order to figure out the listener --- listener.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/listener.go b/listener.go index 46534fc..386bebf 100644 --- a/listener.go +++ b/listener.go @@ -21,8 +21,12 @@ type Listener struct { } func newListener(config *connConfig) (*Listener, error) { + lnet, lnaddr, err := manet.DialArgs(config.maAddr) + if err != nil { + return nil, err + } - ln, err := net.Listen(config.addr.Network(), config.addr.String()) + ln, err := net.Listen(lnet, lnaddr) if err != nil { return nil, fmt.Errorf("failed to listen: %v", err) } From e087f7ebc9ab4788baa9981d380f84e8272f6ece Mon Sep 17 00:00:00 2001 From: Topper Bowers Date: Thu, 22 Aug 2019 18:18:44 +0200 Subject: [PATCH 2/5] working config with upgrader --- conn.go | 263 +++++++++++++++++++++++------------------------- go.mod | 1 + go.sum | 13 +++ listener.go | 18 ++-- transport.go | 26 +++-- webrtcdirect.go | 9 +- 6 files changed, 177 insertions(+), 153 deletions(-) diff --git a/conn.go b/conn.go index 37aeb65..c7b89e4 100644 --- a/conn.go +++ b/conn.go @@ -13,7 +13,6 @@ import ( "time" ic "github.com/libp2p/go-libp2p-core/crypto" - smux "github.com/libp2p/go-libp2p-core/mux" peer "github.com/libp2p/go-libp2p-core/peer" tpt "github.com/libp2p/go-libp2p-core/transport" ma "github.com/multiformats/go-multiaddr" @@ -30,6 +29,16 @@ type connConfig struct { remoteID peer.ID } +func (c *connConfig) CopyWithNewRemoteID(remoteID peer.ID) *connConfig { + return &connConfig{ + transport: c.transport, + maAddr: c.maAddr, + addr: c.addr, + isServer: c.isServer, + remoteID: remoteID, + } +} + func newConnConfig(transport *Transport, maAddr ma.Multiaddr, isServer bool) (*connConfig, error) { httpMa := maAddr.Decapsulate(webrtcma) @@ -52,21 +61,25 @@ type Conn struct { config *connConfig peerConnection *webrtc.PeerConnection - initChannel datachannel.ReadWriteCloser + channel datachannel.ReadWriteCloser + + lock sync.RWMutex + accept chan chan detachResult + addr net.Addr - lock sync.RWMutex - accept chan chan detachResult - isMuxed bool - muxedConn smux.MuxedConn + buf []byte + bufStart int + bufEnd int } func newConn(config *connConfig, pc *webrtc.PeerConnection, initChannel datachannel.ReadWriteCloser) *Conn { conn := &Conn{ config: config, peerConnection: pc, - initChannel: initChannel, + channel: initChannel, accept: make(chan chan detachResult), - isMuxed: config.transport.muxer != nil, + buf: make([]byte, dcWrapperBufSize), + addr: config.addr, } pc.OnDataChannel(func(dc *webrtc.DataChannel) { @@ -175,6 +188,10 @@ func (c *Conn) Close() error { close(c.accept) + newErr := c.channel.Close() + if err == nil { + err = newErr + } return err } @@ -187,39 +204,6 @@ func (c *Conn) IsClosed() bool { return pc == nil } -// OpenStream creates a new stream. -func (c *Conn) OpenStream() (smux.MuxedStream, error) { - muxed, err := c.getMuxed() - if err != nil { - return nil, err - } - if muxed != nil { - return muxed.OpenStream() - } - - rawDC := c.checkInitChannel() - if rawDC == nil { - pc, err := c.getPC() - if err != nil { - return nil, err - } - dc, err := pc.CreateDataChannel("data", nil) - if err != nil { - return nil, err - } - - detachRes := detachChannel(dc) - - res := <-detachRes - if res.err != nil { - return nil, res.err - } - rawDC = res.dc - } - - return newStream(rawDC), nil -} - func (c *Conn) getPC() (*webrtc.PeerConnection, error) { c.lock.RLock() pc := c.peerConnection @@ -232,78 +216,60 @@ func (c *Conn) getPC() (*webrtc.PeerConnection, error) { return pc, nil } -func (c *Conn) getMuxed() (smux.MuxedConn, error) { - c.lock.Lock() - defer c.lock.Unlock() +// func (c *Conn) getMuxed() (smux.MuxedConn, error) { +// c.lock.Lock() +// defer c.lock.Unlock() - if !c.isMuxed { - return nil, nil - } +// if !c.isMuxed { +// return nil, nil +// } - if c.muxedConn != nil { - return c.muxedConn, nil - } +// if c.muxedConn != nil { +// return c.muxedConn, nil +// } - rawDC := c.initChannel - if rawDC == nil { - var err error - rawDC, err = c.awaitAccept() - if err != nil { - return nil, err - } - } +// rawDC := c.initChannel +// if rawDC == nil { +// var err error +// rawDC, err = c.awaitAccept() +// if err != nil { +// return nil, err +// } +// } - err := c.useMuxer(&dcWrapper{channel: rawDC, addr: c.config.addr, buf: make([]byte, dcWrapperBufSize)}, c.config.transport.muxer) - if err != nil { - return nil, err - } +// err := c.useMuxer(&dcWrapper{channel: rawDC, addr: c.config.addr, buf: make([]byte, dcWrapperBufSize)}, c.config.transport.muxer) +// if err != nil { +// return nil, err +// } - return c.muxedConn, nil -} +// return c.muxedConn, nil +// } // Note: caller should hold the conn lock. -func (c *Conn) useMuxer(conn net.Conn, muxer smux.Multiplexer) error { - muxed, err := muxer.NewConn(conn, c.config.isServer) - if err != nil { - return err - } - c.muxedConn = muxed - - return nil -} - -func (c *Conn) checkInitChannel() datachannel.ReadWriteCloser { - c.lock.Lock() - defer c.lock.Unlock() - // Since a WebRTC offer can't be empty the offering side will have - // an initial data channel opened. We return it here, the first time - // OpenStream is called. - if c.initChannel != nil { - ch := c.initChannel - c.initChannel = nil - return ch - } - - return nil -} - -// AcceptStream accepts a stream opened by the other side. -func (c *Conn) AcceptStream() (smux.MuxedStream, error) { - muxed, err := c.getMuxed() - if err != nil { - return nil, err - } - if muxed != nil { - return muxed.AcceptStream() - } - - rawDC := c.checkInitChannel() - if rawDC == nil { - rawDC, err = c.awaitAccept() - } - - return newStream(rawDC), nil -} +// func (c *Conn) useMuxer(conn net.Conn, muxer smux.Multiplexer) error { +// muxed, err := muxer.NewConn(conn, c.config.isServer) +// if err != nil { +// return err +// } +// c.muxedConn = muxed + +// return nil +// } + +// func (c *Conn) checkInitChannel() datachannel.ReadWriteCloser { +// c.lock.Lock() +// defer c.lock.Unlock() +// // Since a WebRTC offer can't be empty the offering side will have +// // an initial data channel opened. We return it here, the first time +// // OpenStream is called. +// if c.initChannel != nil { +// ch := c.initChannel +// c.initChannel = nil +// return ch +// } + +// return nil +// } func (c *Conn) awaitAccept() (datachannel.ReadWriteCloser, error) { detachRes, ok := <-c.accept @@ -312,11 +278,15 @@ func (c *Conn) awaitAccept() (datachannel.ReadWriteCloser, error) { } res := <-detachRes + if res.err == nil { + c.channel = res.dc + } return res.dc, res.err } // LocalPeer returns our peer ID func (c *Conn) LocalPeer() peer.ID { + log.Debugf("local peer called, returning: %s", c.config.transport.localID) // TODO: Base on WebRTC security? return c.config.transport.localID } @@ -330,6 +300,7 @@ func (c *Conn) LocalPrivateKey() ic.PrivKey { // RemotePeer returns the peer ID of the remote peer. func (c *Conn) RemotePeer() peer.ID { + log.Debugf("conn remote peer: %s", c.config.remoteID) // TODO: Base on WebRTC security? return c.config.remoteID } @@ -349,6 +320,7 @@ func (c *Conn) LocalMultiaddr() ma.Multiaddr { // RemoteMultiaddr returns the remote Multiaddr associated // with this connection func (c *Conn) RemoteMultiaddr() ma.Multiaddr { + log.Debugf("remote maddr: ", c.config.maAddr.String()) return c.config.maAddr } @@ -362,65 +334,78 @@ func (c *Conn) Transport() tpt.Transport { const dcWrapperBufSize = math.MaxUint16 // dcWrapper wraps datachannel.ReadWriteCloser to form a net.Conn -type dcWrapper struct { - channel datachannel.ReadWriteCloser - addr net.Addr - - buf []byte - bufStart int - bufEnd int -} - -func (w *dcWrapper) Read(p []byte) (int, error) { +// type dcWrapper struct { +// channel datachannel.ReadWriteCloser +// addr net.Addr + +// buf []byte +// bufStart int +// bufEnd int +// } + +func (c *Conn) Read(p []byte) (int, error) { + if c.channel == nil { + c.awaitAccept() + } var err error - if w.bufEnd == 0 { + if c.bufEnd == 0 { n := 0 - n, err = w.channel.Read(w.buf) - w.bufEnd = n + n, err = c.channel.Read(c.buf) + c.bufEnd = n } n := 0 - if w.bufEnd-w.bufStart > 0 { - n = copy(p, w.buf[w.bufStart:w.bufEnd]) - w.bufStart += n + if c.bufEnd-c.bufStart > 0 { + n = copy(p, c.buf[c.bufStart:c.bufEnd]) + c.bufStart += n - if w.bufStart >= w.bufEnd { - w.bufStart = 0 - w.bufEnd = 0 + if c.bufStart >= c.bufEnd { + c.bufStart = 0 + c.bufEnd = 0 } } + log.Debugf("read finished: %s", string(p)) return n, err } -func (w *dcWrapper) Write(p []byte) (n int, err error) { +func (c *Conn) Write(p []byte) (n int, err error) { + if c.channel == nil { + log.Debugf("awaiting accept") + _, err := c.awaitAccept() + if err != nil { + return 0, fmt.Errorf("error awaiting accept: %v", err) + } + } if len(p) > dcWrapperBufSize { - return w.channel.Write(p[:dcWrapperBufSize]) + log.Debugf("write: %v", p[:dcWrapperBufSize]) + return c.channel.Write(p[:dcWrapperBufSize]) } - return w.channel.Write(p) -} - -func (w *dcWrapper) Close() error { - return w.channel.Close() + log.Debugf("write: (%v) %s", p, string(p)) + num, err := c.channel.Write(p) + if err != nil { + log.Errorf("error writing: %v", err) + } + return num, err } -func (w *dcWrapper) LocalAddr() net.Addr { - return w.addr +func (c *Conn) LocalAddr() net.Addr { + return c.addr } -func (w *dcWrapper) RemoteAddr() net.Addr { - return w.addr +func (c *Conn) RemoteAddr() net.Addr { + return c.addr } -func (w *dcWrapper) SetDeadline(t time.Time) error { +func (c *Conn) SetDeadline(t time.Time) error { return nil } -func (w *dcWrapper) SetReadDeadline(t time.Time) error { +func (c *Conn) SetReadDeadline(t time.Time) error { return nil } -func (w *dcWrapper) SetWriteDeadline(t time.Time) error { +func (c *Conn) SetWriteDeadline(t time.Time) error { return nil } diff --git a/go.mod b/go.mod index 2523c25..d436426 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/libp2p/go-libp2p-core v0.0.9 github.com/libp2p/go-libp2p-mplex v0.2.1 github.com/libp2p/go-libp2p-testing v0.0.4 + github.com/libp2p/go-libp2p-transport-upgrader v0.1.1 github.com/multiformats/go-multiaddr v0.0.4 github.com/multiformats/go-multiaddr-fmt v0.0.1 github.com/multiformats/go-multiaddr-net v0.0.1 diff --git a/go.sum b/go.sum index f90504f..59fa570 100644 --- a/go.sum +++ b/go.sum @@ -46,6 +46,8 @@ github.com/ipfs/go-log v0.0.1/go.mod h1:kL1d2/hzSpI0thNYjiKfjanbVNU+IIGA/WnNESY9 github.com/jbenet/go-cienv v0.1.0/go.mod h1:TqNnHUmJgXau0nCzC7kXWeotg3J9W34CUv5Djy1+FlA= github.com/jbenet/go-detect-race v0.0.0-20150302022421-3463798d9574 h1:Pxjl8Wn3cCU7nB/MCmPEUMbjMHxXFqODW6rce0jpxB4= github.com/jbenet/go-detect-race v0.0.0-20150302022421-3463798d9574/go.mod h1:gynVu6LUw+xMXD3XEvjHQcIbJkWEamnGjJDebRHqTd0= +github.com/jbenet/go-temp-err-catcher v0.0.0-20150120210811-aac704a3f4f2 h1:vhC1OXXiT9R2pczegwz6moDvuRpggaroAXhPIseh57A= +github.com/jbenet/go-temp-err-catcher v0.0.0-20150120210811-aac704a3f4f2/go.mod h1:8GXXJV31xl8whumTzdZsTt3RnUIiPqzkyf7mxToRCMs= github.com/jbenet/goprocess v0.0.0-20160826012719-b497e2f366b8 h1:bspPhN+oKYFk5fcGNuQzp6IGzYQSenLEgH3s6jkXrWw= github.com/jbenet/goprocess v0.0.0-20160826012719-b497e2f366b8/go.mod h1:Ly/wlsjFq/qrU3Rar62tu1gASgGw6chQbSh/XgIIXCY= github.com/jbenet/goprocess v0.1.3 h1:YKyIEECS/XvcfHtBzxtjBBbWK+MbvA6dG8ASiqwvr10= @@ -62,14 +64,21 @@ github.com/libp2p/go-libp2p-core v0.0.1 h1:HSTZtFIq/W5Ue43Zw+uWZyy2Vl5WtF0zDjKN8 github.com/libp2p/go-libp2p-core v0.0.1/go.mod h1:g/VxnTZ/1ygHxH3dKok7Vno1VfpvGcGip57wjTU4fco= github.com/libp2p/go-libp2p-core v0.0.9 h1:Dt0Glhajkwj1zMYRoY0nbVBI7pyRYNLDaKCwss2Jd4I= github.com/libp2p/go-libp2p-core v0.0.9/go.mod h1:0d9xmaYAVY5qmbp/fcgxHT3ZJsLjYeYPMJAUKpaCHrE= +github.com/libp2p/go-libp2p-mplex v0.2.0/go.mod h1:Ejl9IyjvXJ0T9iqUTE1jpYATQ9NM3g+OtR+EMMODbKo= github.com/libp2p/go-libp2p-mplex v0.2.1 h1:E1xaJBQnbSiTHGI1gaBKmKhu1TUKkErKJnE8iGvirYI= github.com/libp2p/go-libp2p-mplex v0.2.1/go.mod h1:SC99Rxs8Vuzrf/6WhmH41kNn13TiYdAWNYHrwImKLnE= github.com/libp2p/go-libp2p-testing v0.0.3 h1:bdij4bKaaND7tCsaXVjRfYkMpvoOeKj9AVQGJllA6jM= github.com/libp2p/go-libp2p-testing v0.0.3/go.mod h1:gvchhf3FQOtBdr+eFUABet5a4MBLK8jM3V4Zghvmi+E= github.com/libp2p/go-libp2p-testing v0.0.4 h1:Qev57UR47GcLPXWjrunv5aLIQGO4n9mhI/8/EIrEEFc= github.com/libp2p/go-libp2p-testing v0.0.4/go.mod h1:gvchhf3FQOtBdr+eFUABet5a4MBLK8jM3V4Zghvmi+E= +github.com/libp2p/go-libp2p-transport-upgrader v0.1.1 h1:PZMS9lhjK9VytzMCW3tWHAXtKXmlURSc3ZdvwEcKCzw= +github.com/libp2p/go-libp2p-transport-upgrader v0.1.1/go.mod h1:IEtA6or8JUbsV07qPW4r01GnTenLW4oi3lOPbUMGJJA= +github.com/libp2p/go-maddr-filter v0.0.4 h1:hx8HIuuwk34KePddrp2mM5ivgPkZ09JH4AvsALRbFUs= +github.com/libp2p/go-maddr-filter v0.0.4/go.mod h1:6eT12kSQMA9x2pvFQa+xesMKUBlj9VImZbj3B9FBH/Q= +github.com/libp2p/go-mplex v0.0.3/go.mod h1:pK5yMLmOoBR1pNCqDlA2GQrdAVTMkqFalaTWe7l4Yd0= github.com/libp2p/go-mplex v0.1.0 h1:/nBTy5+1yRyY82YaO6HXQRnO5IAGsXTjEJaR3LdTPc0= github.com/libp2p/go-mplex v0.1.0/go.mod h1:SXgmdki2kwCUlCCbfGLEgHjC4pFqhTp0ZoV6aiKgxDU= +github.com/libp2p/go-stream-muxer v0.0.1/go.mod h1:bAo8x7YkSpadMTbtTaxGVHWUQsR/l5MEaHbKaliuT14= github.com/lucas-clemente/quic-go v0.7.1-0.20190401152353-907071221cf9 h1:tbuodUh2vuhOVZAdW3NEUvosFHUMJwUNl7jk/VSEiwc= github.com/lucas-clemente/quic-go v0.7.1-0.20190401152353-907071221cf9/go.mod h1:PpMmPfPKO9nKJ/psF49ESTAGQSdfXxlg1otPbEB2nOw= github.com/marten-seemann/qtls v0.2.3 h1:0yWJ43C62LsZt08vuQJDK1uC1czUc3FJeCLPoNAI4vA= @@ -115,8 +124,12 @@ github.com/multiformats/go-multihash v0.0.5/go.mod h1:lt/HCbqlQwlPBz7lv0sQCdtfcM github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.7.0 h1:WSHQ+IS43OoUrWtD1/bbclrwK8TTH5hzp+umCiuxHgs= github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/ginkgo v1.8.0 h1:VkHVNpR4iVnU8XQR6DBm8BqYjN7CRzw+xKUbVVbbW9w= +github.com/onsi/ginkgo v1.8.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/gomega v1.4.3 h1:RE1xgDvH7imwFD45h+u2SgIfERHlS2yNG4DObb5BSKU= github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= +github.com/onsi/gomega v1.5.0 h1:izbySO9zDPmjJ8rDjLvkA2zJHIo+HkYXHnf7eN7SSyo= +github.com/onsi/gomega v1.5.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/opentracing/opentracing-go v1.0.2 h1:3jA2P6O1F9UOrWVpwrIo17pu01KWvNWg4X946/Y5Zwg= github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/pion/datachannel v1.4.5 h1:paz18kYAetpTdK8tlMAtDY+Ayxrv5fndZ5XPZwiZHrU= diff --git a/listener.go b/listener.go index 386bebf..ade864c 100644 --- a/listener.go +++ b/listener.go @@ -7,7 +7,8 @@ import ( "net" "net/http" - tpt "github.com/libp2p/go-libp2p-core/transport" + peer "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-core/transport" ma "github.com/multiformats/go-multiaddr" manet "github.com/multiformats/go-multiaddr-net" ) @@ -26,6 +27,7 @@ func newListener(config *connConfig) (*Listener, error) { return nil, err } + log.Debug("newListener %s : %s", lnet, lnaddr) ln, err := net.Listen(lnet, lnaddr) if err != nil { return nil, fmt.Errorf("failed to listen: %v", err) @@ -45,7 +47,7 @@ func newListener(config *connConfig) (*Listener, error) { l := &Listener{ config: config, - accept: make(chan *Conn), + accept: make(chan *Conn, 1), } mux := http.NewServeMux() @@ -56,6 +58,7 @@ func newListener(config *connConfig) (*Listener, error) { } go func() { + log.Debug("srv serving") srvErr := srv.Serve(ln) if srvErr != nil { log.Warningf("failed to start server: %v", srvErr) @@ -67,6 +70,7 @@ func newListener(config *connConfig) (*Listener, error) { } func (l *Listener) handler(w http.ResponseWriter, r *http.Request) { + log.Debugf("handler") r.ParseForm() signals, ok := r.Form["signal"] if !ok || len(signals) != 1 { @@ -90,6 +94,7 @@ func (l *Listener) handler(w http.ResponseWriter, r *http.Request) { } func (l *Listener) handleSignal(offerStr string) (string, error) { + log.Debugf("handle signal") offer, err := decodeSignal(offerStr) if err != nil { return "", fmt.Errorf("failed to decode offer: %v", err) @@ -120,20 +125,21 @@ func (l *Listener) handleSignal(offerStr string) (string, error) { return "", fmt.Errorf("failed to encode answer: %v", err) } - c := newConn(l.config, pc, nil) + c := newConn(l.config.CopyWithNewRemoteID(peer.ID(offer.SDP)), pc, nil) l.accept <- c - + log.Debug("signal handled") return answerEnc, nil } // Accept waits for and returns the next connection to the listener. -func (l *Listener) Accept() (tpt.CapableConn, error) { +func (l *Listener) Accept() (transport.CapableConn, error) { conn, ok := <-l.accept if !ok { return nil, errors.New("Listener closed") } + log.Debugf("Accept filters: %v, addr: %s", l.config.transport.Upgrader, conn.RemoteMultiaddr()) - return conn, nil + return l.config.transport.Upgrader.UpgradeInbound(context.Background(), l.config.transport, conn) } // Close closes the listener. diff --git a/transport.go b/transport.go index d6230fc..ce64444 100644 --- a/transport.go +++ b/transport.go @@ -6,7 +6,8 @@ import ( smux "github.com/libp2p/go-libp2p-core/mux" peer "github.com/libp2p/go-libp2p-core/peer" - tpt "github.com/libp2p/go-libp2p-core/transport" + "github.com/libp2p/go-libp2p-core/transport" + tptu "github.com/libp2p/go-libp2p-transport-upgrader" ma "github.com/multiformats/go-multiaddr" mafmt "github.com/multiformats/go-multiaddr-fmt" "github.com/pion/webrtc/v2" @@ -15,9 +16,23 @@ import ( // Transport is the WebRTC transport. type Transport struct { webrtcOptions webrtc.Configuration - muxer smux.Multiplexer localID peer.ID api *webrtc.API + Upgrader *tptu.Upgrader +} + +func NewWebRtcTransport(upgrader *tptu.Upgrader) *Transport { + s := webrtc.SettingEngine{} + // Use Detach data channels mode + s.DetachDataChannels() + + api := webrtc.NewAPI(webrtc.WithSettingEngine(s)) + return &Transport{ + webrtcOptions: webrtc.Configuration{}, + localID: peer.ID(1), + api: api, + Upgrader: upgrader, + } } // NewTransport creates a WebRTC transport that signals over a direct HTTP connection. @@ -30,7 +45,6 @@ func NewTransport(webrtcOptions webrtc.Configuration, muxer smux.Multiplexer) *T api := webrtc.NewAPI(webrtc.WithSettingEngine(s)) return &Transport{ webrtcOptions: webrtcOptions, - muxer: muxer, // TODO: Make the muxer optional localID: peer.ID(1), api: api, } @@ -43,7 +57,7 @@ func (t *Transport) CanDial(addr ma.Multiaddr) bool { } // Dial dials the peer at the remote address. -func (t *Transport) Dial(ctx context.Context, raddr ma.Multiaddr, p peer.ID) (tpt.CapableConn, error) { +func (t *Transport) Dial(ctx context.Context, raddr ma.Multiaddr, p peer.ID) (transport.CapableConn, error) { if !t.CanDial(raddr) { return nil, fmt.Errorf("can't dial address %s", raddr) } @@ -60,11 +74,11 @@ func (t *Transport) Dial(ctx context.Context, raddr ma.Multiaddr, p peer.ID) (tp return nil, fmt.Errorf("failed to create connection: %v", err) } - return conn, nil + return t.Upgrader.UpgradeOutbound(ctx, t, conn, p) } // Listen listens on the given multiaddr. -func (t *Transport) Listen(laddr ma.Multiaddr) (tpt.Listener, error) { +func (t *Transport) Listen(laddr ma.Multiaddr) (transport.Listener, error) { if !t.CanDial(laddr) { return nil, fmt.Errorf("can't listen on address %s", laddr) } diff --git a/webrtcdirect.go b/webrtcdirect.go index 1ee7b00..109d038 100644 --- a/webrtcdirect.go +++ b/webrtcdirect.go @@ -9,11 +9,16 @@ import ( var log = logging.Logger("webrtcdirect-tpt") +func init() { + log.Debug("hello!") +} + var webrtcma, _ = ma.NewMultiaddr("/p2p-webrtc-direct") var httpma, _ = ma.NewMultiaddr("/http") var _ tpt.Transport = &Transport{} -var _ tpt.CapableConn = &Conn{} -var _ tpt.Listener = &Listener{} + +// var _ tpt.CapableConn = &Conn{} +// var _ tpt.Listener = &Listener{} var _ smux.MuxedStream = &Stream{} From 2d63991cefc45c03d829c5fdf219ee7744dba8d0 Mon Sep 17 00:00:00 2001 From: Topper Bowers Date: Fri, 23 Aug 2019 10:47:51 +0200 Subject: [PATCH 3/5] passing tests --- conn.go | 97 +++++++------------------------------------- go.mod | 4 +- go.sum | 40 +++++++++++++++++- listener.go | 12 ++++-- transport.go | 23 +++-------- transport_test.go | 16 ++++---- webrtcdirect.go | 3 -- webrtcdirect_test.go | 65 ++++++++++++++--------------- 8 files changed, 112 insertions(+), 148 deletions(-) diff --git a/conn.go b/conn.go index c7b89e4..dd322db 100644 --- a/conn.go +++ b/conn.go @@ -186,8 +186,6 @@ func (c *Conn) Close() error { } c.peerConnection = nil - close(c.accept) - newErr := c.channel.Close() if err == nil { err = newErr @@ -216,72 +214,24 @@ func (c *Conn) getPC() (*webrtc.PeerConnection, error) { return pc, nil } -// func (c *Conn) getMuxed() (smux.MuxedConn, error) { -// c.lock.Lock() -// defer c.lock.Unlock() - -// if !c.isMuxed { -// return nil, nil -// } - -// if c.muxedConn != nil { -// return c.muxedConn, nil -// } - -// rawDC := c.initChannel -// if rawDC == nil { -// var err error -// rawDC, err = c.awaitAccept() -// if err != nil { -// return nil, err -// } -// } - -// err := c.useMuxer(&dcWrapper{channel: rawDC, addr: c.config.addr, buf: make([]byte, dcWrapperBufSize)}, c.config.transport.muxer) -// if err != nil { -// return nil, err -// } - -// return c.muxedConn, nil -// } - -// Note: caller should hold the conn lock. -// func (c *Conn) useMuxer(conn net.Conn, muxer smux.Multiplexer) error { -// muxed, err := muxer.NewConn(conn, c.config.isServer) -// if err != nil { -// return err -// } -// c.muxedConn = muxed - -// return nil -// } - -// func (c *Conn) checkInitChannel() datachannel.ReadWriteCloser { -// c.lock.Lock() -// defer c.lock.Unlock() -// // Since a WebRTC offer can't be empty the offering side will have -// // an initial data channel opened. We return it here, the first time -// // OpenStream is called. -// if c.initChannel != nil { -// ch := c.initChannel -// c.initChannel = nil -// return ch -// } - -// return nil -// } - -func (c *Conn) awaitAccept() (datachannel.ReadWriteCloser, error) { +func (c *Conn) awaitAccept(timeout time.Duration) error { + timer := time.NewTimer(timeout) + defer timer.Stop() detachRes, ok := <-c.accept if !ok { - return nil, errors.New("Conn closed") + return errors.New("Conn closed") } - res := <-detachRes - if res.err == nil { - c.channel = res.dc + select { + case res := <-detachRes: + if res.err == nil { + c.channel = res.dc + } + return res.err + case <-timer.C: + return errors.New("timeout awaiting webrtc accept") } - return res.dc, res.err + return fmt.Errorf("awaitAccept errored in a strange way and skipped a select") } // LocalPeer returns our peer ID @@ -333,19 +283,9 @@ func (c *Conn) Transport() tpt.Transport { // packetizing strategy. const dcWrapperBufSize = math.MaxUint16 -// dcWrapper wraps datachannel.ReadWriteCloser to form a net.Conn -// type dcWrapper struct { -// channel datachannel.ReadWriteCloser -// addr net.Addr - -// buf []byte -// bufStart int -// bufEnd int -// } - func (c *Conn) Read(p []byte) (int, error) { if c.channel == nil { - c.awaitAccept() + return 0, errors.New("channel is not open") } var err error @@ -365,24 +305,17 @@ func (c *Conn) Read(p []byte) (int, error) { c.bufEnd = 0 } } - log.Debugf("read finished: %s", string(p)) return n, err } func (c *Conn) Write(p []byte) (n int, err error) { if c.channel == nil { - log.Debugf("awaiting accept") - _, err := c.awaitAccept() - if err != nil { - return 0, fmt.Errorf("error awaiting accept: %v", err) - } + return 0, errors.New("channel is not open") } if len(p) > dcWrapperBufSize { - log.Debugf("write: %v", p[:dcWrapperBufSize]) return c.channel.Write(p[:dcWrapperBufSize]) } - log.Debugf("write: (%v) %s", p, string(p)) num, err := c.channel.Write(p) if err != nil { log.Errorf("error writing: %v", err) diff --git a/go.mod b/go.mod index d436426..c28d39c 100644 --- a/go.mod +++ b/go.mod @@ -5,8 +5,10 @@ go 1.12 require ( github.com/ipfs/go-log v0.0.1 github.com/jbenet/go-detect-race v0.0.0-20150302022421-3463798d9574 - github.com/libp2p/go-libp2p-core v0.0.9 + github.com/libp2p/go-libp2p-core v0.2.0 github.com/libp2p/go-libp2p-mplex v0.2.1 + github.com/libp2p/go-libp2p-peer v0.2.0 + github.com/libp2p/go-libp2p-secio v0.2.0 github.com/libp2p/go-libp2p-testing v0.0.4 github.com/libp2p/go-libp2p-transport-upgrader v0.1.1 github.com/multiformats/go-multiaddr v0.0.4 diff --git a/go.sum b/go.sum index 59fa570..d96809e 100644 --- a/go.sum +++ b/go.sum @@ -53,20 +53,37 @@ github.com/jbenet/goprocess v0.0.0-20160826012719-b497e2f366b8/go.mod h1:Ly/wlsj github.com/jbenet/goprocess v0.1.3 h1:YKyIEECS/XvcfHtBzxtjBBbWK+MbvA6dG8ASiqwvr10= github.com/jbenet/goprocess v0.1.3/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZlqdZVfqY4= github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= +github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/jrick/logrotate v1.0.0/go.mod h1:LNinyqDIJnpAur+b8yyulnQw/wDuN1+BYKlTRt3OuAQ= +github.com/kami-zh/go-capturer v0.0.0-20171211120116-e492ea43421d/go.mod h1:P2viExyCEfeWGU259JnaQ34Inuec4R38JCyBx2edgD0= github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23/go.mod h1:J+Gs4SYgM6CZQHDETBtE9HaSEkGmuNXF86RwHhHUvq4= +github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= +github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/libp2p/go-buffer-pool v0.0.1 h1:9Rrn/H46cXjaA2HQ5Y8lyhOS1NhTkZ4yuEs2r3Eechg= github.com/libp2p/go-buffer-pool v0.0.1/go.mod h1:xtyIz9PMobb13WaxR6Zo1Pd1zXJKYg0a8KiIvDp3TzQ= +github.com/libp2p/go-buffer-pool v0.0.2 h1:QNK2iAFa8gjAe1SPz6mHSMuCcjs+X1wlHzeOSqcmlfs= +github.com/libp2p/go-buffer-pool v0.0.2/go.mod h1:MvaB6xw5vOrDl8rYZGLFdKAuk/hRoRZd1Vi32+RXyFM= github.com/libp2p/go-flow-metrics v0.0.1/go.mod h1:Iv1GH0sG8DtYN3SVJ2eG221wMiNpZxBdp967ls1g+k8= github.com/libp2p/go-libp2p-core v0.0.1 h1:HSTZtFIq/W5Ue43Zw+uWZyy2Vl5WtF0zDjKN8/DT/1I= github.com/libp2p/go-libp2p-core v0.0.1/go.mod h1:g/VxnTZ/1ygHxH3dKok7Vno1VfpvGcGip57wjTU4fco= -github.com/libp2p/go-libp2p-core v0.0.9 h1:Dt0Glhajkwj1zMYRoY0nbVBI7pyRYNLDaKCwss2Jd4I= -github.com/libp2p/go-libp2p-core v0.0.9/go.mod h1:0d9xmaYAVY5qmbp/fcgxHT3ZJsLjYeYPMJAUKpaCHrE= +github.com/libp2p/go-libp2p-core v0.2.0 h1:ycFtuNwtZBAJSxzaHbyv6NjG3Yj5Nmra1csHaQ3zwaw= +github.com/libp2p/go-libp2p-core v0.2.0/go.mod h1:X0eyB0Gy93v0DZtSYbEM7RnMChm9Uv3j7yRXjO77xSI= +github.com/libp2p/go-libp2p-crypto v0.1.0 h1:k9MFy+o2zGDNGsaoZl0MA3iZ75qXxr9OOoAZF+sD5OQ= +github.com/libp2p/go-libp2p-crypto v0.1.0/go.mod h1:sPUokVISZiy+nNuTTH/TY+leRSxnFj/2GLjtOTW90hI= github.com/libp2p/go-libp2p-mplex v0.2.0/go.mod h1:Ejl9IyjvXJ0T9iqUTE1jpYATQ9NM3g+OtR+EMMODbKo= github.com/libp2p/go-libp2p-mplex v0.2.1 h1:E1xaJBQnbSiTHGI1gaBKmKhu1TUKkErKJnE8iGvirYI= github.com/libp2p/go-libp2p-mplex v0.2.1/go.mod h1:SC99Rxs8Vuzrf/6WhmH41kNn13TiYdAWNYHrwImKLnE= +github.com/libp2p/go-libp2p-peer v0.2.0 h1:EQ8kMjaCUwt/Y5uLgjT8iY2qg0mGUT0N1zUjer50DsY= +github.com/libp2p/go-libp2p-peer v0.2.0/go.mod h1:RCffaCvUyW2CJmG2gAWVqwePwW7JMgxjsHm7+J5kjWY= +github.com/libp2p/go-libp2p-secio v0.2.0 h1:ywzZBsWEEz2KNTn5RtzauEDq5RFEefPsttXYwAWqHng= +github.com/libp2p/go-libp2p-secio v0.2.0/go.mod h1:2JdZepB8J5V9mBp79BmwsaPQhRPNN2NrnB2lKQcdy6g= +github.com/libp2p/go-libp2p-testing v0.0.2/go.mod h1:gvchhf3FQOtBdr+eFUABet5a4MBLK8jM3V4Zghvmi+E= github.com/libp2p/go-libp2p-testing v0.0.3 h1:bdij4bKaaND7tCsaXVjRfYkMpvoOeKj9AVQGJllA6jM= github.com/libp2p/go-libp2p-testing v0.0.3/go.mod h1:gvchhf3FQOtBdr+eFUABet5a4MBLK8jM3V4Zghvmi+E= github.com/libp2p/go-libp2p-testing v0.0.4 h1:Qev57UR47GcLPXWjrunv5aLIQGO4n9mhI/8/EIrEEFc= @@ -78,15 +95,21 @@ github.com/libp2p/go-maddr-filter v0.0.4/go.mod h1:6eT12kSQMA9x2pvFQa+xesMKUBlj9 github.com/libp2p/go-mplex v0.0.3/go.mod h1:pK5yMLmOoBR1pNCqDlA2GQrdAVTMkqFalaTWe7l4Yd0= github.com/libp2p/go-mplex v0.1.0 h1:/nBTy5+1yRyY82YaO6HXQRnO5IAGsXTjEJaR3LdTPc0= github.com/libp2p/go-mplex v0.1.0/go.mod h1:SXgmdki2kwCUlCCbfGLEgHjC4pFqhTp0ZoV6aiKgxDU= +github.com/libp2p/go-msgio v0.0.4 h1:agEFehY3zWJFUHK6SEMR7UYmk2z6kC3oeCM7ybLhguA= +github.com/libp2p/go-msgio v0.0.4/go.mod h1:63lBBgOTDKQL6EWazRMCwXsEeEeK9O2Cd+0+6OOuipQ= github.com/libp2p/go-stream-muxer v0.0.1/go.mod h1:bAo8x7YkSpadMTbtTaxGVHWUQsR/l5MEaHbKaliuT14= github.com/lucas-clemente/quic-go v0.7.1-0.20190401152353-907071221cf9 h1:tbuodUh2vuhOVZAdW3NEUvosFHUMJwUNl7jk/VSEiwc= github.com/lucas-clemente/quic-go v0.7.1-0.20190401152353-907071221cf9/go.mod h1:PpMmPfPKO9nKJ/psF49ESTAGQSdfXxlg1otPbEB2nOw= +github.com/mailru/easyjson v0.0.0-20180823135443-60711f1a8329/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= github.com/marten-seemann/qtls v0.2.3 h1:0yWJ43C62LsZt08vuQJDK1uC1czUc3FJeCLPoNAI4vA= github.com/marten-seemann/qtls v0.2.3/go.mod h1:xzjG7avBwGGbdZ8dTGxlBnLArsVKLvwmjgmPuiQEcYk= +github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU= github.com/mattn/go-colorable v0.1.1 h1:G1f5SKeVxmagw/IyvzvtZE4Gybcc4Tr1tf7I8z0XgOg= github.com/mattn/go-colorable v0.1.1/go.mod h1:FuOcm+DKB9mbwrcAfNl7/TZVBZ6rcnceauSikq3lYCQ= +github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4= github.com/mattn/go-isatty v0.0.5 h1:tHXDdz1cpzGaovsTB+TVB8q90WEokoVmfMqoVcrLUgw= github.com/mattn/go-isatty v0.0.5/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= +github.com/mgutz/ansi v0.0.0-20170206155736-9520e82c474b/go.mod h1:01TrycV0kFyexm33Z7vhZRXopbI8J3TDReVlkTgMUxE= github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1 h1:lYpkrQH5ajf0OXOcUbGjvZxxijuBwbbmlSxLiuofa+g= github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1/go.mod h1:pD8RvIylQ358TN4wwqatJ8rNavkEINozVn9DtGI3dfQ= github.com/minio/sha256-simd v0.0.0-20190131020904-2d45a736cd16 h1:5W7KhL8HVF3XCFOweFD3BNESdnO8ewyYTFT2R+/b8FQ= @@ -175,19 +198,26 @@ github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= +github.com/smola/gocompat v0.2.0/go.mod h1:1B0MlxbmoZNo3h8guHp8HztB3BSYR5itql9qtVc0ypY= github.com/spacemonkeygo/openssl v0.0.0-20181017203307-c2dcc5cca94a h1:/eS3yfGjQKG+9kayBkj0ip1BGhq6zJ3eaVksphxAaek= github.com/spacemonkeygo/openssl v0.0.0-20181017203307-c2dcc5cca94a/go.mod h1:7AyxJNCJ7SBZ1MfVQCWD6Uqo2oubI2Eq2y2eqf+A5r0= github.com/spacemonkeygo/spacelog v0.0.0-20180420211403-2296661a0572 h1:RC6RW7j+1+HkWaX/Yh71Ee5ZHaHYt7ZP4sQgUrm6cDU= github.com/spacemonkeygo/spacelog v0.0.0-20180420211403-2296661a0572/go.mod h1:w0SWMsp6j9O/dk4/ZpIhL+3CkG8ofA2vuv7k+ltqUMc= github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= +github.com/src-d/envconfig v1.0.0/go.mod h1:Q9YQZ7BKITldTBnoxsE5gOeB5y66RyPXeue/R4aaNBc= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/whyrusleeping/go-logging v0.0.0-20170515211332-0457bb6b88fc h1:9lDbC6Rz4bwmou+oE6Dt4Cb2BGMur5eR/GYptkKUVHo= github.com/whyrusleeping/go-logging v0.0.0-20170515211332-0457bb6b88fc/go.mod h1:bopw91TMyo8J3tvftk8xmU2kPmlrt4nScJQZU2hE5EM= +github.com/x-cray/logrus-prefixed-formatter v0.5.2/go.mod h1:2duySbKsL6M18s5GU7VPsoEPHyzalCE06qoARUCeBBE= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= golang.org/x/crypto v0.0.0-20170930174604-9419663f5a44/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= +golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190211182817-74369b46fc67/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190225124518-7f87c0fbb88b/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190228161510-8dd112bcdc25 h1:jsG6UpNLt9iAsb0S2AGW28DveNzzgmbXR+ENoPjUeIU= @@ -220,6 +250,7 @@ golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6 h1:bjcUS9ztw9kFmmIxJInhon/0Is3p+EHBKNgquIzo1OI= golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190219092855-153ac476189d/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -235,6 +266,7 @@ golang.org/x/sys v0.0.0-20190712062909-fae7ac547cb7/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20181130052023-1c3d964395ce/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= @@ -244,8 +276,12 @@ google.golang.org/genproto v0.0.0-20190307195333-5fe7a883aa19/go.mod h1:VzzqZJRn google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= +gopkg.in/src-d/go-cli.v0 v0.0.0-20181105080154-d492247bbc0d/go.mod h1:z+K8VcOYVYcSwSjGebuDL6176A1XskgbtNl64NSg+n8= +gopkg.in/src-d/go-log.v1 v1.0.1/go.mod h1:GN34hKP0g305ysm2/hctJ0Y8nWP3zxXXJ8GFabTyABE= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.2.1 h1:mUhvW9EsL+naU5Q3cakzfE91YhliOondGd6ZrsDBHQE= diff --git a/listener.go b/listener.go index ade864c..c359b8a 100644 --- a/listener.go +++ b/listener.go @@ -6,6 +6,7 @@ import ( "fmt" "net" "net/http" + "time" peer "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/transport" @@ -27,7 +28,6 @@ func newListener(config *connConfig) (*Listener, error) { return nil, err } - log.Debug("newListener %s : %s", lnet, lnaddr) ln, err := net.Listen(lnet, lnaddr) if err != nil { return nil, fmt.Errorf("failed to listen: %v", err) @@ -126,7 +126,14 @@ func (l *Listener) handleSignal(offerStr string) (string, error) { } c := newConn(l.config.CopyWithNewRemoteID(peer.ID(offer.SDP)), pc, nil) - l.accept <- c + go func() { + err := c.awaitAccept(30 * time.Second) + if err != nil { + log.Warningf("error awaiting channel accept: %v", err) + return + } + l.accept <- c + }() log.Debug("signal handled") return answerEnc, nil } @@ -137,7 +144,6 @@ func (l *Listener) Accept() (transport.CapableConn, error) { if !ok { return nil, errors.New("Listener closed") } - log.Debugf("Accept filters: %v, addr: %s", l.config.transport.Upgrader, conn.RemoteMultiaddr()) return l.config.transport.Upgrader.UpgradeInbound(context.Background(), l.config.transport, conn) } diff --git a/transport.go b/transport.go index ce64444..89e080b 100644 --- a/transport.go +++ b/transport.go @@ -4,7 +4,6 @@ import ( "context" "fmt" - smux "github.com/libp2p/go-libp2p-core/mux" peer "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/transport" tptu "github.com/libp2p/go-libp2p-transport-upgrader" @@ -21,32 +20,22 @@ type Transport struct { Upgrader *tptu.Upgrader } -func NewWebRtcTransport(upgrader *tptu.Upgrader) *Transport { +func NewTransport(upgrader *tptu.Upgrader) *Transport { s := webrtc.SettingEngine{} // Use Detach data channels mode s.DetachDataChannels() api := webrtc.NewAPI(webrtc.WithSettingEngine(s)) - return &Transport{ - webrtcOptions: webrtc.Configuration{}, - localID: peer.ID(1), - api: api, - Upgrader: upgrader, - } -} -// NewTransport creates a WebRTC transport that signals over a direct HTTP connection. -// It is currently required to provide a muxer. -func NewTransport(webrtcOptions webrtc.Configuration, muxer smux.Multiplexer) *Transport { - s := webrtc.SettingEngine{} - // Use Detach data channels mode - s.DetachDataChannels() + if upgrader == nil { + return nil + } - api := webrtc.NewAPI(webrtc.WithSettingEngine(s)) return &Transport{ - webrtcOptions: webrtcOptions, + webrtcOptions: webrtc.Configuration{}, localID: peer.ID(1), api: api, + Upgrader: upgrader, } } diff --git a/transport_test.go b/transport_test.go index 352d291..7501049 100644 --- a/transport_test.go +++ b/transport_test.go @@ -25,16 +25,16 @@ func SubtestTransport(t *testing.T, ta, tb tpt.Transport, addr string, peerA pee subtests := []func(t *testing.T, ta, tb tpt.Transport, maddr ma.Multiaddr, peerA peer.ID){} if detectrace.WithRace() { - subtests = []func(t *testing.T, ta, tb tpt.Transport, maddr ma.Multiaddr, peerA peer.ID){ - ttransport.SubtestProtocols, - ttransport.SubtestBasic, + // subtests = []func(t *testing.T, ta, tb tpt.Transport, maddr ma.Multiaddr, peerA peer.ID){ + // ttransport.SubtestProtocols, + // ttransport.SubtestBasic, - ttransport.SubtestCancel, - ttransport.SubtestPingPong, + // ttransport.SubtestCancel, + // ttransport.SubtestPingPong, - // Stolen from the stream muxer test suite. - ttransport.SubtestStress1Conn1Stream1Msg, - } + // // Stolen from the stream muxer test suite. + // ttransport.SubtestStress1Conn1Stream1Msg, + // } } else { subtests = []func(t *testing.T, ta, tb tpt.Transport, maddr ma.Multiaddr, peerA peer.ID){ ttransport.SubtestStress1Conn1Stream100Msg, diff --git a/webrtcdirect.go b/webrtcdirect.go index 109d038..0ade943 100644 --- a/webrtcdirect.go +++ b/webrtcdirect.go @@ -18,7 +18,4 @@ var httpma, _ = ma.NewMultiaddr("/http") var _ tpt.Transport = &Transport{} -// var _ tpt.CapableConn = &Conn{} -// var _ tpt.Listener = &Listener{} - var _ smux.MuxedStream = &Stream{} diff --git a/webrtcdirect_test.go b/webrtcdirect_test.go index bbd39a6..07e1414 100644 --- a/webrtcdirect_test.go +++ b/webrtcdirect_test.go @@ -1,48 +1,49 @@ package libp2pwebrtcdirect import ( + "crypto/rand" "testing" - logging "github.com/ipfs/go-log" - + libp2pcrypto "github.com/libp2p/go-libp2p-core/crypto" mplex "github.com/libp2p/go-libp2p-mplex" - ma "github.com/multiformats/go-multiaddr" - "github.com/pion/webrtc/v2" -) - -func TestTransport(t *testing.T) { - logging.SetLogLevel("*", "warning") - - ta := NewTransport( - webrtc.Configuration{}, - new(mplex.Transport), - ) - tb := NewTransport( - webrtc.Configuration{}, - new(mplex.Transport), - ) + peer "github.com/libp2p/go-libp2p-peer" + secio "github.com/libp2p/go-libp2p-secio" - addr := "/ip4/127.0.0.1/tcp/0/http/p2p-webrtc-direct" + logging "github.com/ipfs/go-log" + tptu "github.com/libp2p/go-libp2p-transport-upgrader" +) - // TODO: Re-enable normal test suite when not hitting CI limits when using race detector - // utils.SubtestTransport(t, ta, tb, addr, "peerA") - SubtestTransport(t, ta, tb, addr, "peerA") +func newUpgrader(t *testing.T) (*tptu.Upgrader, libp2pcrypto.PrivKey) { + keyPriv, _, err := libp2pcrypto.GenerateSecp256k1Key(rand.Reader) + if err != nil { + t.Fatalf("error creating key: %v", err) + } + secTransp, err := secio.New(keyPriv) + if err != nil { + t.Fatalf("error creating secio transport: %v", err) + } + return &tptu.Upgrader{ + Muxer: mplex.DefaultTransport, + Secure: secTransp, + }, keyPriv } -func TestTransportCantListenUtp(t *testing.T) { - utpa, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/50000") +func TestTransport(t *testing.T) { + logging.SetLogLevel("*", "debug") + aUpgrade, aKey := newUpgrader(t) + bUpgrade, _ := newUpgrader(t) + + aId, err := peer.IDFromPublicKey(aKey.GetPublic()) if err != nil { - t.Fatal(err) + t.Fatalf("error getting id: %v", err) } - tpt := NewTransport( - webrtc.Configuration{}, - new(mplex.Transport), - ) + ta := NewTransport(aUpgrade) + tb := NewTransport(bUpgrade) - _, err = tpt.Listen(utpa) - if err == nil { - t.Fatal("shouldnt be able to listen on utp addr with tcp transport") - } + addr := "/ip4/127.0.0.1/tcp/0/http/p2p-webrtc-direct" + // TODO: Re-enable normal test suite when not hitting CI limits when using race detector + // utils.SubtestTransport(t, ta, tb, addr, "peerA") + SubtestTransport(t, ta, tb, addr, aId) } From e515515c25594d4e72aaf60ba7faee2485f46866 Mon Sep 17 00:00:00 2001 From: Topper Bowers Date: Fri, 23 Aug 2019 10:51:49 +0200 Subject: [PATCH 4/5] uncomment tests --- transport_test.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/transport_test.go b/transport_test.go index 7501049..352d291 100644 --- a/transport_test.go +++ b/transport_test.go @@ -25,16 +25,16 @@ func SubtestTransport(t *testing.T, ta, tb tpt.Transport, addr string, peerA pee subtests := []func(t *testing.T, ta, tb tpt.Transport, maddr ma.Multiaddr, peerA peer.ID){} if detectrace.WithRace() { - // subtests = []func(t *testing.T, ta, tb tpt.Transport, maddr ma.Multiaddr, peerA peer.ID){ - // ttransport.SubtestProtocols, - // ttransport.SubtestBasic, + subtests = []func(t *testing.T, ta, tb tpt.Transport, maddr ma.Multiaddr, peerA peer.ID){ + ttransport.SubtestProtocols, + ttransport.SubtestBasic, - // ttransport.SubtestCancel, - // ttransport.SubtestPingPong, + ttransport.SubtestCancel, + ttransport.SubtestPingPong, - // // Stolen from the stream muxer test suite. - // ttransport.SubtestStress1Conn1Stream1Msg, - // } + // Stolen from the stream muxer test suite. + ttransport.SubtestStress1Conn1Stream1Msg, + } } else { subtests = []func(t *testing.T, ta, tb tpt.Transport, maddr ma.Multiaddr, peerA peer.ID){ ttransport.SubtestStress1Conn1Stream100Msg, From 0fbe86f66c0d2405c3da214dedd9b411e58f18aa Mon Sep 17 00:00:00 2001 From: Topper Bowers Date: Fri, 23 Aug 2019 11:03:11 +0200 Subject: [PATCH 5/5] don't allow two closes --- listener.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/listener.go b/listener.go index c359b8a..17ade9a 100644 --- a/listener.go +++ b/listener.go @@ -151,10 +151,14 @@ func (l *Listener) Accept() (transport.CapableConn, error) { // Close closes the listener. // Any blocked Accept operations will be unblocked and return errors. func (l *Listener) Close() error { + if l.srv == nil { + return nil // don't close twice + } err := l.srv.Shutdown(context.Background()) if err != nil { return err } + l.srv = nil close(l.accept)