Skip to content
This repository was archived by the owner on Nov 22, 2022. It is now read-only.

switch to using the standard libp2p upgrader #33

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
226 changes: 72 additions & 154 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"time"

ic "github.com/libp2p/go-libp2p-core/crypto"
smux "github.com/libp2p/go-libp2p-core/mux"
peer "github.com/libp2p/go-libp2p-core/peer"
tpt "github.com/libp2p/go-libp2p-core/transport"
ma "github.com/multiformats/go-multiaddr"
Expand All @@ -30,6 +29,16 @@ type connConfig struct {
remoteID peer.ID
}

func (c *connConfig) CopyWithNewRemoteID(remoteID peer.ID) *connConfig {
return &connConfig{
transport: c.transport,
maAddr: c.maAddr,
addr: c.addr,
isServer: c.isServer,
remoteID: remoteID,
}
}

func newConnConfig(transport *Transport, maAddr ma.Multiaddr, isServer bool) (*connConfig, error) {
httpMa := maAddr.Decapsulate(webrtcma)

Expand All @@ -52,21 +61,25 @@ type Conn struct {
config *connConfig

peerConnection *webrtc.PeerConnection
initChannel datachannel.ReadWriteCloser
channel datachannel.ReadWriteCloser

lock sync.RWMutex
accept chan chan detachResult
addr net.Addr

lock sync.RWMutex
accept chan chan detachResult
isMuxed bool
muxedConn smux.MuxedConn
buf []byte
bufStart int
bufEnd int
}

func newConn(config *connConfig, pc *webrtc.PeerConnection, initChannel datachannel.ReadWriteCloser) *Conn {
conn := &Conn{
config: config,
peerConnection: pc,
initChannel: initChannel,
channel: initChannel,
accept: make(chan chan detachResult),
isMuxed: config.transport.muxer != nil,
buf: make([]byte, dcWrapperBufSize),
addr: config.addr,
}

pc.OnDataChannel(func(dc *webrtc.DataChannel) {
Expand Down Expand Up @@ -173,8 +186,10 @@ func (c *Conn) Close() error {
}
c.peerConnection = nil

close(c.accept)

newErr := c.channel.Close()
if err == nil {
err = newErr
}
return err
}

Expand All @@ -187,39 +202,6 @@ func (c *Conn) IsClosed() bool {
return pc == nil
}

// OpenStream creates a new stream.
func (c *Conn) OpenStream() (smux.MuxedStream, error) {
muxed, err := c.getMuxed()
if err != nil {
return nil, err
}
if muxed != nil {
return muxed.OpenStream()
}

rawDC := c.checkInitChannel()
if rawDC == nil {
pc, err := c.getPC()
if err != nil {
return nil, err
}
dc, err := pc.CreateDataChannel("data", nil)
if err != nil {
return nil, err
}

detachRes := detachChannel(dc)

res := <-detachRes
if res.err != nil {
return nil, res.err
}
rawDC = res.dc
}

return newStream(rawDC), nil
}

func (c *Conn) getPC() (*webrtc.PeerConnection, error) {
c.lock.RLock()
pc := c.peerConnection
Expand All @@ -232,91 +214,29 @@ func (c *Conn) getPC() (*webrtc.PeerConnection, error) {
return pc, nil
}

func (c *Conn) getMuxed() (smux.MuxedConn, error) {
c.lock.Lock()
defer c.lock.Unlock()

if !c.isMuxed {
return nil, nil
}

if c.muxedConn != nil {
return c.muxedConn, nil
}

rawDC := c.initChannel
if rawDC == nil {
var err error
rawDC, err = c.awaitAccept()
if err != nil {
return nil, err
}
}

err := c.useMuxer(&dcWrapper{channel: rawDC, addr: c.config.addr, buf: make([]byte, dcWrapperBufSize)}, c.config.transport.muxer)
if err != nil {
return nil, err
}

return c.muxedConn, nil
}

// Note: caller should hold the conn lock.
func (c *Conn) useMuxer(conn net.Conn, muxer smux.Multiplexer) error {
muxed, err := muxer.NewConn(conn, c.config.isServer)
if err != nil {
return err
}
c.muxedConn = muxed

return nil
}

func (c *Conn) checkInitChannel() datachannel.ReadWriteCloser {
c.lock.Lock()
defer c.lock.Unlock()
// Since a WebRTC offer can't be empty the offering side will have
// an initial data channel opened. We return it here, the first time
// OpenStream is called.
if c.initChannel != nil {
ch := c.initChannel
c.initChannel = nil
return ch
}

return nil
}

// AcceptStream accepts a stream opened by the other side.
func (c *Conn) AcceptStream() (smux.MuxedStream, error) {
muxed, err := c.getMuxed()
if err != nil {
return nil, err
}
if muxed != nil {
return muxed.AcceptStream()
}

rawDC := c.checkInitChannel()
if rawDC == nil {
rawDC, err = c.awaitAccept()
}

return newStream(rawDC), nil
}

func (c *Conn) awaitAccept() (datachannel.ReadWriteCloser, error) {
func (c *Conn) awaitAccept(timeout time.Duration) error {
timer := time.NewTimer(timeout)
defer timer.Stop()
detachRes, ok := <-c.accept
if !ok {
return nil, errors.New("Conn closed")
return errors.New("Conn closed")
}

res := <-detachRes
return res.dc, res.err
select {
case res := <-detachRes:
if res.err == nil {
c.channel = res.dc
}
return res.err
case <-timer.C:
return errors.New("timeout awaiting webrtc accept")
}
return fmt.Errorf("awaitAccept errored in a strange way and skipped a select")
}

// LocalPeer returns our peer ID
func (c *Conn) LocalPeer() peer.ID {
log.Debugf("local peer called, returning: %s", c.config.transport.localID)
// TODO: Base on WebRTC security?
return c.config.transport.localID
}
Expand All @@ -330,6 +250,7 @@ func (c *Conn) LocalPrivateKey() ic.PrivKey {

// RemotePeer returns the peer ID of the remote peer.
func (c *Conn) RemotePeer() peer.ID {
log.Debugf("conn remote peer: %s", c.config.remoteID)
// TODO: Base on WebRTC security?
return c.config.remoteID
}
Expand All @@ -349,6 +270,7 @@ func (c *Conn) LocalMultiaddr() ma.Multiaddr {
// RemoteMultiaddr returns the remote Multiaddr associated
// with this connection
func (c *Conn) RemoteMultiaddr() ma.Multiaddr {
log.Debugf("remote maddr: ", c.config.maAddr.String())
return c.config.maAddr
}

Expand All @@ -361,66 +283,62 @@ func (c *Conn) Transport() tpt.Transport {
// packetizing strategy.
const dcWrapperBufSize = math.MaxUint16

// dcWrapper wraps datachannel.ReadWriteCloser to form a net.Conn
type dcWrapper struct {
channel datachannel.ReadWriteCloser
addr net.Addr

buf []byte
bufStart int
bufEnd int
}

func (w *dcWrapper) Read(p []byte) (int, error) {
func (c *Conn) Read(p []byte) (int, error) {
if c.channel == nil {
return 0, errors.New("channel is not open")
}
var err error

if w.bufEnd == 0 {
if c.bufEnd == 0 {
n := 0
n, err = w.channel.Read(w.buf)
w.bufEnd = n
n, err = c.channel.Read(c.buf)
c.bufEnd = n
}

n := 0
if w.bufEnd-w.bufStart > 0 {
n = copy(p, w.buf[w.bufStart:w.bufEnd])
w.bufStart += n
if c.bufEnd-c.bufStart > 0 {
n = copy(p, c.buf[c.bufStart:c.bufEnd])
c.bufStart += n

if w.bufStart >= w.bufEnd {
w.bufStart = 0
w.bufEnd = 0
if c.bufStart >= c.bufEnd {
c.bufStart = 0
c.bufEnd = 0
}
}

return n, err
}

func (w *dcWrapper) Write(p []byte) (n int, err error) {
func (c *Conn) Write(p []byte) (n int, err error) {
if c.channel == nil {
return 0, errors.New("channel is not open")
}
if len(p) > dcWrapperBufSize {
return w.channel.Write(p[:dcWrapperBufSize])
return c.channel.Write(p[:dcWrapperBufSize])
}
return w.channel.Write(p)
}

func (w *dcWrapper) Close() error {
return w.channel.Close()
num, err := c.channel.Write(p)
if err != nil {
log.Errorf("error writing: %v", err)
}
return num, err
}

func (w *dcWrapper) LocalAddr() net.Addr {
return w.addr
func (c *Conn) LocalAddr() net.Addr {
return c.addr
}

func (w *dcWrapper) RemoteAddr() net.Addr {
return w.addr
func (c *Conn) RemoteAddr() net.Addr {
return c.addr
}

func (w *dcWrapper) SetDeadline(t time.Time) error {
func (c *Conn) SetDeadline(t time.Time) error {
return nil
}

func (w *dcWrapper) SetReadDeadline(t time.Time) error {
func (c *Conn) SetReadDeadline(t time.Time) error {
return nil
}

func (w *dcWrapper) SetWriteDeadline(t time.Time) error {
func (c *Conn) SetWriteDeadline(t time.Time) error {
return nil
}
5 changes: 4 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@ go 1.12
require (
github.com/ipfs/go-log v0.0.1
github.com/jbenet/go-detect-race v0.0.0-20150302022421-3463798d9574
github.com/libp2p/go-libp2p-core v0.0.9
github.com/libp2p/go-libp2p-core v0.2.0
github.com/libp2p/go-libp2p-mplex v0.2.1
github.com/libp2p/go-libp2p-peer v0.2.0
github.com/libp2p/go-libp2p-secio v0.2.0
github.com/libp2p/go-libp2p-testing v0.0.4
github.com/libp2p/go-libp2p-transport-upgrader v0.1.1
github.com/multiformats/go-multiaddr v0.0.4
github.com/multiformats/go-multiaddr-fmt v0.0.1
github.com/multiformats/go-multiaddr-net v0.0.1
Expand Down
Loading