Skip to content

Commit 832cf47

Browse files
committed
Check opened streams periodically
Fixes issue #10 Pull request libp2p/go-libp2p-core#250 removed the OpenStream notification. Punchr uses the notification here: https://github.com/dennis-tra/punchr/blob/c4b2fcf93e63ae4a8656864a399638c0c9641372/cmd/client/host.go#L372
1 parent d029589 commit 832cf47

File tree

1 file changed

+37
-67
lines changed

1 file changed

+37
-67
lines changed

cmd/client/host.go

Lines changed: 37 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
)
2525

2626
var (
27+
Version = "0.2.0"
2728
CommunicationTimeout = 15 * time.Second
2829
RetryCount = 3
2930
)
@@ -33,21 +34,19 @@ type Host struct {
3334
host.Host
3435

3536
holePunchEventsPeers sync.Map
36-
streamOpenPeers sync.Map
3737
}
3838

3939
func InitHost(ctx context.Context, privKey crypto.PrivKey) (*Host, error) {
4040
log.Info("Starting libp2p host...")
4141

4242
h := &Host{
4343
holePunchEventsPeers: sync.Map{},
44-
streamOpenPeers: sync.Map{},
4544
}
4645

4746
// Configure new libp2p host
4847
libp2pHost, err := libp2p.New(
4948
libp2p.Identity(privKey),
50-
libp2p.UserAgent("punchr/go-client/0.1.0"),
49+
libp2p.UserAgent("punchr/go-client/"+Version),
5150
libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"),
5251
libp2p.ListenAddrStrings("/ip4/0.0.0.0/udp/0/quic"),
5352
libp2p.ListenAddrStrings("/ip6/::/tcp/0"),
@@ -184,13 +183,13 @@ func (h *Host) HolePunch(ctx context.Context, addrInfo peer.AddrInfo) *HolePunch
184183
for i := 0; i < RetryCount; i++ {
185184
// wait for the DCUtR stream to be opened
186185
select {
187-
case <-h.WaitForDCUtRStream(addrInfo.ID):
188-
// pass
189-
case <-time.After(CommunicationTimeout):
190-
// Stream was not opened in time by the remote.
191-
hpState.Outcome = pb.HolePunchOutcome_HOLE_PUNCH_OUTCOME_NO_STREAM
192-
hpState.Error = "/libp2p/dcutr stream was not opened after " + CommunicationTimeout.String()
193-
return hpState
186+
case _, ok := <-h.WaitForDCUtRStream(addrInfo.ID):
187+
if !ok {
188+
// Stream was not opened in time by the remote.
189+
hpState.Outcome = pb.HolePunchOutcome_HOLE_PUNCH_OUTCOME_NO_STREAM
190+
hpState.Error = "/libp2p/dcutr stream was not opened after " + CommunicationTimeout.String()
191+
return hpState
192+
}
194193
case <-ctx.Done():
195194
hpState.Outcome = pb.HolePunchOutcome_HOLE_PUNCH_OUTCOME_CANCELLED
196195
hpState.Error = ctx.Err().Error()
@@ -274,25 +273,39 @@ func (hps HolePunchState) TrackHolePunch(ctx context.Context, remoteID peer.ID,
274273
}
275274

276275
func (h *Host) WaitForDCUtRStream(pid peer.ID) <-chan struct{} {
277-
evtChan := make(chan struct{})
278-
h.streamOpenPeers.Store(pid, evtChan)
276+
dcutrOpenedChan := make(chan struct{})
279277

280-
// Exit early if the DCUtR stream is already open
281-
for _, conn := range h.Network().ConnsToPeer(pid) {
282-
for _, stream := range conn.GetStreams() {
283-
if stream.Protocol() == holepunch.Protocol {
284-
// If not found, it was already closed by the open stream handler
285-
if _, found := h.streamOpenPeers.LoadAndDelete(pid); !found {
286-
return evtChan
278+
// The following go routine is a hack. We want to be notified as soon as the remote peer has opened a dcutr stream.
279+
// go-libp2p v0.20.0 has removed the OpenedStream notification (which didn't really work anyway). Now we check
280+
// every 10 ms all streams on all connections for the /libp2p/dcutr stream. If
281+
go func() {
282+
timeout := time.After(CommunicationTimeout)
283+
timer := time.NewTimer(0)
284+
for {
285+
select {
286+
case <-timeout:
287+
close(dcutrOpenedChan)
288+
return
289+
case <-timer.C:
290+
}
291+
292+
for _, conn := range h.Network().ConnsToPeer(pid) {
293+
for _, stream := range conn.GetStreams() {
294+
if stream.Protocol() != holepunch.Protocol {
295+
continue
296+
}
297+
h.logEntry(pid).Debugln("/libp2p/dcutr stream opened!")
298+
dcutrOpenedChan <- struct{}{}
299+
close(dcutrOpenedChan)
300+
return
287301
}
288-
close(evtChan)
289-
return evtChan
290302
}
303+
timer.Reset(10 * time.Millisecond)
291304
}
292-
}
305+
}()
293306

294307
h.logEntry(pid).Infoln("Waiting for /libp2p/dcutr stream...")
295-
return evtChan
308+
return dcutrOpenedChan
296309
}
297310

298311
func (h *Host) RegisterPeerToTrace(pid peer.ID) <-chan *holepunch.Event {
@@ -312,7 +325,7 @@ func (h *Host) UnregisterPeerToTrace(pid peer.ID) {
312325
for {
313326
select {
314327
case evt := <-evtChan:
315-
h.logEntry(pid).WithField("evtType", evt.Type).Infoln("Draining event channel")
328+
h.logEntry(pid).WithField("evtType", evt.Type).Warnln("Draining event channel")
316329
default:
317330
close(evtChan)
318331
return
@@ -362,46 +375,3 @@ func (h *Host) Listen(network.Network, multiaddr.Multiaddr) {}
362375
func (h *Host) ListenClose(network.Network, multiaddr.Multiaddr) {}
363376
func (h *Host) Connected(network.Network, network.Conn) {}
364377
func (h *Host) Disconnected(network.Network, network.Conn) {}
365-
func (h *Host) ClosedStream(_ network.Network, stream network.Stream) {
366-
if stream.Protocol() != holepunch.Protocol {
367-
return
368-
}
369-
h.logEntry(stream.Conn().RemotePeer()).Debugln("/libp2p/dcutr stream closed!")
370-
}
371-
372-
func (h *Host) OpenedStream(_ network.Network, stream network.Stream) {
373-
// The following is a hack. `stream` does not have the `Protocol` field set yet. So we just check
374-
// every 5 ms for a total of 15 s.
375-
go func() {
376-
timeout := time.After(CommunicationTimeout)
377-
timer := time.NewTimer(0)
378-
for {
379-
380-
select {
381-
case <-timeout:
382-
return
383-
case <-timer.C:
384-
}
385-
386-
if stream.Protocol() == "" {
387-
timer.Reset(5 * time.Millisecond)
388-
continue
389-
}
390-
391-
if stream.Protocol() != holepunch.Protocol {
392-
return
393-
}
394-
395-
break
396-
397-
}
398-
399-
val, found := h.streamOpenPeers.LoadAndDelete(stream.Conn().RemotePeer())
400-
if !found {
401-
return
402-
}
403-
404-
h.logEntry(stream.Conn().RemotePeer()).Debugln("/libp2p/dcutr stream opened!")
405-
close(val.(chan struct{}))
406-
}()
407-
}

0 commit comments

Comments
 (0)