Skip to content
This repository was archived by the owner on Aug 2, 2021. It is now read-only.

Commit befd91d

Browse files
acudzelig
authored andcommitted
network/hive: move message handlers from Peer to Hive (#1801)
1 parent 89f5fb7 commit befd91d

File tree

3 files changed

+87
-89
lines changed

3 files changed

+87
-89
lines changed

network/hive.go

+86-4
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package network
1818

1919
import (
20+
"context"
2021
"fmt"
2122
"sync"
2223
"time"
@@ -167,7 +168,7 @@ func (h *Hive) connect() {
167168
func (h *Hive) tickHive() {
168169
addr, depth, changed := h.SuggestPeer()
169170
if h.Discovery && changed {
170-
NotifyDepth(uint8(depth), h.Kademlia)
171+
h.NotifyDepth(uint8(depth))
171172
}
172173
if addr != nil {
173174
log.Trace(fmt.Sprintf("%08x hive connect() suggested %08x", h.BaseAddr()[:4], addr.Address()[:4]))
@@ -194,15 +195,15 @@ func (h *Hive) Run(p *BzzPeer) error {
194195
if h.Discovery {
195196
if changed {
196197
// if depth changed, send to all peers
197-
NotifyDepth(depth, h.Kademlia)
198+
h.NotifyDepth(depth)
198199
} else {
199200
// otherwise just send depth to new peer
200201
dp.NotifyDepth(depth)
201202
}
202-
NotifyPeer(p.BzzAddr, h.Kademlia)
203+
h.NotifyPeer(p.BzzAddr)
203204
}
204205
defer h.Off(dp)
205-
return dp.Run(dp.HandleMsg)
206+
return dp.Run(h.handleMsg(dp))
206207
}
207208

208209
func (h *Hive) trackPeer(p *BzzPeer) {
@@ -329,3 +330,84 @@ func (h *Hive) savePeers() error {
329330
}
330331
return nil
331332
}
333+
334+
var sortPeers = noSortPeers
335+
336+
// handleMsg is the message handler that delegates incoming messages
337+
func (h *Hive) handleMsg(p *Peer) func(context.Context, interface{}) error {
338+
return func(ctx context.Context, msg interface{}) error {
339+
switch msg := msg.(type) {
340+
case *peersMsg:
341+
return h.handlePeersMsg(p, msg)
342+
case *subPeersMsg:
343+
return h.handleSubPeersMsg(ctx, p, msg)
344+
}
345+
346+
return fmt.Errorf("unknown message type: %T", msg)
347+
}
348+
}
349+
350+
// NotifyDepth sends a message to all connections if depth of saturation is changed
351+
func (h *Hive) NotifyDepth(depth uint8) {
352+
f := func(val *Peer, po int) bool {
353+
val.NotifyDepth(depth)
354+
return true
355+
}
356+
h.EachConn(nil, 255, f)
357+
}
358+
359+
// NotifyPeer informs all peers about a newly added node
360+
func (h *Hive) NotifyPeer(p *BzzAddr) {
361+
f := func(val *Peer, po int) bool {
362+
val.NotifyPeer(p, uint8(po))
363+
return true
364+
}
365+
h.EachConn(p.Address(), 255, f)
366+
}
367+
368+
// handlePeersMsg called by the protocol when receiving peerset (for target address)
369+
// list of nodes ([]PeerAddr in peersMsg) is added to the overlay db using the
370+
// Register interface method
371+
func (h *Hive) handlePeersMsg(d *Peer, msg *peersMsg) error {
372+
// register all addresses
373+
if len(msg.Peers) == 0 {
374+
return nil
375+
}
376+
for _, a := range msg.Peers {
377+
d.seen(a)
378+
h.NotifyPeer(a)
379+
}
380+
return h.Register(msg.Peers...)
381+
}
382+
383+
// handleSubPeersMsg handles incoming subPeersMsg
384+
// this message represents the saturation depth of the remote peer
385+
// saturation depth is the radius within which the peer subscribes to peers
386+
// the first time this is received we send peer info on all
387+
// our connected peers that fall within peers saturation depth
388+
// otherwise this depth is just recorded on the peer, so that
389+
// subsequent new connections are sent iff they fall within the radius
390+
func (h *Hive) handleSubPeersMsg(ctx context.Context, d *Peer, msg *subPeersMsg) error {
391+
d.setDepth(msg.Depth)
392+
// only send peers after the initial subPeersMsg
393+
if !d.sentPeers {
394+
var peers []*BzzAddr
395+
// iterate connection in ascending order of disctance from the remote address
396+
h.EachConn(d.Over(), 255, func(p *Peer, po int) bool {
397+
// terminate if we are beyond the radius
398+
if uint8(po) < msg.Depth {
399+
return false
400+
}
401+
if !d.seen(p.BzzAddr) { // here just records the peer sent
402+
peers = append(peers, p.BzzAddr)
403+
}
404+
return true
405+
})
406+
// if useful peers are found, send them over
407+
if len(peers) > 0 {
408+
go d.Send(ctx, &peersMsg{Peers: sortPeers(peers)})
409+
}
410+
}
411+
d.sentPeers = true
412+
return nil
413+
}

network/discovery.go network/peer.go

+1-85
Original file line numberDiff line numberDiff line change
@@ -25,16 +25,12 @@ import (
2525
"github.com/ethersphere/swarm/pot"
2626
)
2727

28-
// discovery bzz extension for requesting and relaying node address records
29-
30-
var sortPeers = noSortPeers
31-
3228
// Peer wraps BzzPeer and embeds Kademlia overlay connectivity driver
3329
type Peer struct {
3430
*BzzPeer
3531
kad *Kademlia
3632
sentPeers bool // whether we already sent peer closer to this address
37-
mtx sync.RWMutex //
33+
mtx sync.RWMutex // protect peers map
3834
peers map[string]bool // tracks node records sent to the peer
3935
depth uint8 // the proximity order advertised by remote as depth of saturation
4036
}
@@ -51,39 +47,6 @@ func NewPeer(p *BzzPeer, kad *Kademlia) *Peer {
5147
return d
5248
}
5349

54-
// HandleMsg is the message handler that delegates incoming messages
55-
func (d *Peer) HandleMsg(ctx context.Context, msg interface{}) error {
56-
switch msg := msg.(type) {
57-
58-
case *peersMsg:
59-
return d.handlePeersMsg(msg)
60-
61-
case *subPeersMsg:
62-
return d.handleSubPeersMsg(msg)
63-
64-
default:
65-
return fmt.Errorf("unknown message type: %T", msg)
66-
}
67-
}
68-
69-
// NotifyDepth sends a message to all connections if depth of saturation is changed
70-
func NotifyDepth(depth uint8, kad *Kademlia) {
71-
f := func(val *Peer, po int) bool {
72-
val.NotifyDepth(depth)
73-
return true
74-
}
75-
kad.EachConn(nil, 255, f)
76-
}
77-
78-
// NotifyPeer informs all peers about a newly added node
79-
func NotifyPeer(p *BzzAddr, k *Kademlia) {
80-
f := func(val *Peer, po int) bool {
81-
val.NotifyPeer(p, uint8(po))
82-
return true
83-
}
84-
k.EachConn(p.Address(), 255, f)
85-
}
86-
8750
// NotifyPeer notifies the remote node (recipient) about a peer if
8851
// the peer's PO is within the recipients advertised depth
8952
// OR the peer is closer to the recipient than self
@@ -154,21 +117,6 @@ func (msg peersMsg) String() string {
154117
return fmt.Sprintf("%T: %v", msg, msg.Peers)
155118
}
156119

157-
// handlePeersMsg called by the protocol when receiving peerset (for target address)
158-
// list of nodes ([]PeerAddr in peersMsg) is added to the overlay db using the
159-
// Register interface method
160-
func (d *Peer) handlePeersMsg(msg *peersMsg) error {
161-
// register all addresses
162-
if len(msg.Peers) == 0 {
163-
return nil
164-
}
165-
for _, a := range msg.Peers {
166-
d.seen(a)
167-
NotifyPeer(a, d.kad)
168-
}
169-
return d.kad.Register(msg.Peers...)
170-
}
171-
172120
// subPeers msg is communicating the depth of the overlay table of a peer
173121
type subPeersMsg struct {
174122
Depth uint8
@@ -179,38 +127,6 @@ func (msg subPeersMsg) String() string {
179127
return fmt.Sprintf("%T: request peers > PO%02d. ", msg, msg.Depth)
180128
}
181129

182-
// handleSubPeersMsg handles incoming subPeersMsg
183-
// this message represents the saturation depth of the remote peer
184-
// saturation depth is the radius within which the peer subscribes to peers
185-
// the first time this is received we send peer info on all
186-
// our connected peers that fall within peers saturation depth
187-
// otherwise this depth is just recorded on the peer, so that
188-
// subsequent new connections are sent iff they fall within the radius
189-
func (d *Peer) handleSubPeersMsg(msg *subPeersMsg) error {
190-
d.setDepth(msg.Depth)
191-
// only send peers after the initial subPeersMsg
192-
if !d.sentPeers {
193-
var peers []*BzzAddr
194-
// iterate connection in ascending order of disctance from the remote address
195-
d.kad.EachConn(d.Over(), 255, func(p *Peer, po int) bool {
196-
// terminate if we are beyond the radius
197-
if uint8(po) < msg.Depth {
198-
return false
199-
}
200-
if !d.seen(p.BzzAddr) { // here just records the peer sent
201-
peers = append(peers, p.BzzAddr)
202-
}
203-
return true
204-
})
205-
// if useful peers are found, send them over
206-
if len(peers) > 0 {
207-
go d.Send(context.TODO(), &peersMsg{Peers: sortPeers(peers)})
208-
}
209-
}
210-
d.sentPeers = true
211-
return nil
212-
}
213-
214130
// seen takes a peer address and checks if it was sent to a peer already
215131
// if not, marks the peer as sent
216132
func (d *Peer) seen(p *BzzAddr) bool {
File renamed without changes.

0 commit comments

Comments
 (0)