Skip to content

Commit 732d2f7

Browse files
committed
p2p: manage all port mappings in a single loop
Better to do it like this because the external IP needs to be polled as well.
1 parent ea3090a commit 732d2f7

File tree

3 files changed

+190
-221
lines changed

3 files changed

+190
-221
lines changed

p2p/server.go

+20-92
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ import (
3535
"github.com/ethereum/go-ethereum/log"
3636
"github.com/ethereum/go-ethereum/p2p/discover"
3737
"github.com/ethereum/go-ethereum/p2p/enode"
38-
"github.com/ethereum/go-ethereum/p2p/enr"
3938
"github.com/ethereum/go-ethereum/p2p/nat"
4039
"github.com/ethereum/go-ethereum/p2p/netutil"
4140
"golang.org/x/exp/slices"
@@ -195,6 +194,9 @@ type Server struct {
195194
discmix *enode.FairMix
196195
dialsched *dialScheduler
197196

197+
// This is read by the NAT port mapping loop.
198+
portMappingRegister chan *portMapping
199+
198200
// Channels into the run loop.
199201
quit chan struct{}
200202
addtrusted chan *enode.Node
@@ -483,6 +485,11 @@ func (srv *Server) Start() (err error) {
483485
if err := srv.setupLocalNode(); err != nil {
484486
return err
485487
}
488+
489+
// The NAT protocol mapping channel can receive up to two messages: one for
490+
// enabling TCP port mapping, and one more for enabling the UDP port mapping.
491+
srv.setupPortMapping()
492+
486493
if srv.ListenAddr != "" {
487494
if err := srv.setupListening(); err != nil {
488495
return err
@@ -521,24 +528,6 @@ func (srv *Server) setupLocalNode() error {
521528
srv.localnode.Set(e)
522529
}
523530
}
524-
switch srv.NAT.(type) {
525-
case nil:
526-
// No NAT interface, do nothing.
527-
case nat.ExtIP:
528-
// ExtIP doesn't block, set the IP right away.
529-
ip, _ := srv.NAT.ExternalIP()
530-
srv.localnode.SetStaticIP(ip)
531-
default:
532-
// Ask the router about the IP. This takes a while and blocks startup,
533-
// do it in the background.
534-
srv.loopWG.Add(1)
535-
go func() {
536-
defer srv.loopWG.Done()
537-
if ip, err := srv.NAT.ExternalIP(); err == nil {
538-
srv.localnode.SetStaticIP(ip)
539-
}
540-
}()
541-
}
542531
return nil
543532
}
544533

@@ -657,12 +646,12 @@ func (srv *Server) setupListening() error {
657646

658647
// Update the local node record and map the TCP listening port if NAT is configured.
659648
tcp, isTCP := listener.Addr().(*net.TCPAddr)
660-
if srv.NAT != nil && isTCP && !tcp.IP.IsLoopback() {
661-
srv.loopWG.Add(1)
662-
go func() {
663-
srv.natMapLoop(srv.NAT, "tcp", tcp.Port, tcp.Port, "ethereum p2p", nat.DefaultMapTimeout)
664-
srv.loopWG.Done()
665-
}()
649+
if isTCP && !tcp.IP.IsLoopback() {
650+
srv.portMappingRegister <- &portMapping{
651+
protocol: "TCP",
652+
name: "ethereum p2p",
653+
port: tcp.Port,
654+
}
666655
}
667656

668657
srv.loopWG.Add(1)
@@ -690,78 +679,17 @@ func (srv *Server) setupUDPListening() (*net.UDPConn, error) {
690679
srv.localnode.SetFallbackUDP(realaddr.Port)
691680
srv.log.Debug("UDP listener up", "addr", realaddr)
692681

693-
// Enable port mapping if configured.
694-
if srv.NAT != nil && !realaddr.IP.IsLoopback() {
695-
srv.loopWG.Add(1)
696-
go func() {
697-
defer srv.loopWG.Done()
698-
srv.natMapLoop(srv.NAT, "udp", realaddr.Port, realaddr.Port, "ethereum p2p", nat.DefaultMapTimeout)
699-
}()
682+
if !realaddr.IP.IsLoopback() {
683+
srv.portMappingRegister <- &portMapping{
684+
protocol: "UDP",
685+
name: "ethereum peer discovery",
686+
port: realaddr.Port,
687+
}
700688
}
701689

702690
return conn, nil
703691
}
704692

705-
// natMapLoop performs initialization mapping for nat and repeats refresh.
706-
func (srv *Server) natMapLoop(natm nat.Interface, protocol string, intport, extport int, name string, interval time.Duration) {
707-
newLogger := func(p string, e int, i int, n nat.Interface) log.Logger {
708-
return log.New("proto", p, "extport", e, "intport", i, "interface", n)
709-
}
710-
log := newLogger(protocol, extport, intport, natm)
711-
712-
var (
713-
hasMapping bool
714-
internal = intport
715-
external = extport
716-
mapTimeout = interval
717-
refresh = time.NewTimer(time.Duration(0))
718-
)
719-
// Set to 0 to perform initial port mapping. This will return C
720-
// immediately and set it to mapTimeout in the next loop.
721-
defer func() {
722-
refresh.Stop()
723-
if hasMapping {
724-
log.Debug("Deleting port mapping")
725-
natm.DeleteMapping(protocol, external, internal)
726-
}
727-
}()
728-
729-
loop:
730-
for {
731-
select {
732-
case <-srv.quit:
733-
return
734-
735-
case <-refresh.C:
736-
log.Trace("Start port mapping")
737-
p, err := natm.AddMapping(protocol, external, internal, name, mapTimeout)
738-
if err != nil {
739-
hasMapping = false
740-
log.Debug("Couldn't add port mapping", "err", err)
741-
continue loop
742-
}
743-
// It was mapped!
744-
hasMapping = true
745-
if p != uint16(external) {
746-
external = int(p)
747-
log = newLogger(protocol, external, internal, natm)
748-
log.Info("NAT mapped alternative port")
749-
} else {
750-
log.Info("NAT mapped port")
751-
}
752-
753-
// Update port in local ENR.
754-
switch protocol {
755-
case "tcp":
756-
srv.localnode.Set(enr.TCP(external))
757-
case "udp":
758-
srv.localnode.SetFallbackUDP(external)
759-
}
760-
refresh.Reset(mapTimeout)
761-
}
762-
}
763-
}
764-
765693
// doPeerOp runs fn on the main loop.
766694
func (srv *Server) doPeerOp(fn peerOpFunc) {
767695
select {

p2p/server_nat.go

+170
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
package p2p
2+
3+
import (
4+
"net"
5+
"time"
6+
7+
"github.com/ethereum/go-ethereum/common/mclock"
8+
"github.com/ethereum/go-ethereum/log"
9+
"github.com/ethereum/go-ethereum/p2p/enr"
10+
"github.com/ethereum/go-ethereum/p2p/nat"
11+
)
12+
13+
const (
14+
portMapDuration = 10 * time.Minute
15+
portMapRefreshInterval = 8 * time.Minute
16+
portMapRetryInterval = 5 * time.Minute
17+
extipRetryInterval = 2 * time.Minute
18+
)
19+
20+
type portMapping struct {
21+
protocol string
22+
name string
23+
port int
24+
25+
// for use by the portMappingLoop goroutine:
26+
extPort int // if non-zero, this is mapped port returned by the NAT interface
27+
nextTime mclock.AbsTime
28+
}
29+
30+
// setupPortMapping starts the port mapping loop if necessary.
31+
// Note: this needs to be called after the LocalNode instance has been set on the server.
32+
func (srv *Server) setupPortMapping() {
33+
// portMappingRegister will receive up to two values: one for
34+
// thr TCP port if listening is enabled, and one more for enabling UDP port mapping
35+
// if discovery is enabled. We make it buffered to avoid blocking their setup while
36+
// a port mapping request is in progress.
37+
srv.portMappingRegister = make(chan *portMapping, 2)
38+
39+
switch srv.NAT.(type) {
40+
case nil:
41+
// No NAT interface, do nothing.
42+
srv.loopWG.Add(1)
43+
go srv.consumePortMappingRequests()
44+
45+
case nat.ExtIP:
46+
// ExtIP doesn't block, set the IP right away.
47+
ip, _ := srv.NAT.ExternalIP()
48+
srv.localnode.SetStaticIP(ip)
49+
srv.loopWG.Add(1)
50+
go srv.consumePortMappingRequests()
51+
52+
default:
53+
// Ask the router about the IP. This takes a while and blocks startup,
54+
// do it in the background.
55+
srv.loopWG.Add(1)
56+
go srv.portMappingLoop()
57+
}
58+
}
59+
60+
func (srv *Server) consumePortMappingRequests() {
61+
defer srv.loopWG.Done()
62+
for {
63+
select {
64+
case <-srv.quit:
65+
return
66+
case <-srv.portMappingRegister:
67+
}
68+
}
69+
}
70+
71+
// portMappingLoop manages port mappings for UDP and TCP.
72+
func (srv *Server) portMappingLoop() {
73+
defer srv.loopWG.Done()
74+
75+
newLogger := func(p string, e int, i int) log.Logger {
76+
return log.New("proto", p, "extport", e, "intport", i, "interface", srv.NAT)
77+
}
78+
79+
var (
80+
refresh = mclock.NewAlarm(srv.clock)
81+
mappings = make(map[string]*portMapping, 2)
82+
)
83+
defer func() {
84+
refresh.Stop()
85+
for _, m := range mappings {
86+
if m.extPort != 0 {
87+
log := newLogger(m.protocol, m.extPort, m.port)
88+
log.Debug("Deleting port mapping")
89+
srv.NAT.DeleteMapping(m.protocol, m.extPort, m.port)
90+
}
91+
}
92+
}()
93+
94+
var (
95+
lastExternalIP net.IP
96+
)
97+
for {
98+
// Schedule next refresh.
99+
for _, p := range mappings {
100+
refresh.Schedule(p.nextTime)
101+
}
102+
103+
select {
104+
case <-srv.quit:
105+
return
106+
107+
case m := <-srv.portMappingRegister:
108+
if m.protocol != "TCP" && m.protocol != "UDP" {
109+
panic("unknown NAT protocol name: " + m.protocol)
110+
}
111+
mappings[m.protocol] = m
112+
m.nextTime = srv.clock.Now()
113+
114+
case <-refresh.C():
115+
now := srv.clock.Now()
116+
117+
// Get/update the external IP address.
118+
extip, err := srv.NAT.ExternalIP()
119+
if err != nil {
120+
log.Debug("Couldn't get external IP", "err", err, "interface", srv.NAT)
121+
srv.localnode.SetStaticIP(nil)
122+
refresh.Schedule(now.Add(extipRetryInterval))
123+
} else if !extip.Equal(lastExternalIP) {
124+
log.Debug("External IP changed", "ip", extip, "interface", srv.NAT)
125+
srv.localnode.SetStaticIP(extip)
126+
lastExternalIP = extip
127+
}
128+
129+
// Update all mappings.
130+
for _, m := range mappings {
131+
if now < m.nextTime {
132+
continue
133+
}
134+
135+
external := m.port
136+
if m.extPort != 0 {
137+
external = m.extPort
138+
}
139+
log := newLogger(m.protocol, external, m.port)
140+
141+
log.Trace("Attempting port mapping")
142+
p, err := srv.NAT.AddMapping(m.protocol, external, m.port, m.name, portMapDuration)
143+
now = srv.clock.Now()
144+
if err != nil {
145+
log.Debug("Couldn't add port mapping", "err", err)
146+
m.extPort = 0
147+
m.nextTime = now.Add(portMapRetryInterval)
148+
continue
149+
}
150+
// It was mapped!
151+
external = int(p)
152+
m.nextTime = now.Add(portMapRefreshInterval)
153+
m.extPort = external
154+
if external != m.port {
155+
log = newLogger(m.protocol, external, m.port)
156+
log.Info("NAT mapped alternative port")
157+
} else {
158+
log.Info("NAT mapped port")
159+
}
160+
// Update port in local ENR.
161+
switch m.protocol {
162+
case "TCP":
163+
srv.localnode.Set(enr.TCP(external))
164+
case "UDP":
165+
srv.localnode.SetFallbackUDP(external)
166+
}
167+
}
168+
}
169+
}
170+
}

0 commit comments

Comments
 (0)