Skip to content
This repository was archived by the owner on Aug 2, 2021. It is now read-only.

Commit 16db47b

Browse files
authored
p2p, network, bzzeth: p2p protocol handlers to async (#2018)
p2p, network, bzzeth, pss: use async run from p2p/protocol
1 parent b3d0708 commit 16db47b

File tree

10 files changed

+542
-340
lines changed

10 files changed

+542
-340
lines changed

bzzeth/bzzeth.go

+26-22
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"context"
2121
"encoding/hex"
2222
"errors"
23+
"fmt"
2324
"sync"
2425
"time"
2526

@@ -102,11 +103,11 @@ func (b *BzzEth) handleMsg(p *Peer) func(context.Context, interface{}) error {
102103
p.logger.Trace("bzzeth.handleMsg")
103104
switch msg := msg.(type) {
104105
case *NewBlockHeaders:
105-
go b.handleNewBlockHeaders(ctx, p, msg)
106+
return b.handleNewBlockHeaders(ctx, p, msg)
106107
case *BlockHeaders:
107-
go b.handleBlockHeaders(ctx, p, msg)
108+
return b.handleBlockHeaders(ctx, p, msg)
108109
case *GetBlockHeaders:
109-
go b.handleGetBlockHeaders(ctx, p, msg)
110+
return b.handleGetBlockHeaders(ctx, p, msg)
110111
}
111112
return nil
112113
}
@@ -123,7 +124,7 @@ func (b *BzzEth) handleMsgFromSwarmNode(p *Peer) func(context.Context, interface
123124

124125
// handleNewBlockHeaders handles new header hashes
125126
// only request headers that are in Kad Nearest Neighbourhood
126-
func (b *BzzEth) handleNewBlockHeaders(ctx context.Context, p *Peer, msg *NewBlockHeaders) {
127+
func (b *BzzEth) handleNewBlockHeaders(ctx context.Context, p *Peer, msg *NewBlockHeaders) error {
127128
p.logger.Trace("bzzeth.handleNewBlockHeaders")
128129

129130
// collect the addresses of blocks that are not in our localstore
@@ -135,7 +136,7 @@ func (b *BzzEth) handleNewBlockHeaders(ctx context.Context, p *Peer, msg *NewBlo
135136
yes, err := b.netStore.Store.HasMulti(ctx, addresses...)
136137
if err != nil {
137138
log.Error("Error checking hashesh in store", "Reason", err)
138-
return
139+
return nil
139140
}
140141

141142
// collect the hashes of block headers we want
@@ -160,7 +161,7 @@ func (b *BzzEth) handleNewBlockHeaders(ctx context.Context, p *Peer, msg *NewBlo
160161
req, err := p.getBlockHeaders(ctx, hashes, deliveries)
161162
if err != nil {
162163
p.logger.Error("Error sending GetBlockHeader message", "Reason", err)
163-
return
164+
return nil
164165
}
165166
defer req.cancel()
166167

@@ -172,19 +173,24 @@ func (b *BzzEth) handleNewBlockHeaders(ctx context.Context, p *Peer, msg *NewBlo
172173
case hdr, ok := <-deliveries:
173174
if !ok {
174175
p.logger.Debug("bzzeth.handleNewBlockHeaders", "delivered", deliveredCnt)
175-
return
176+
// todo: introduce better errors
177+
return nil
176178
}
177179
ch := newChunk(hdr)
178180
deliveredCnt++
179181
p.logger.Trace("bzzeth.handleNewBlockHeaders", "hash", ch.Address().Hex(), "delivered", deliveredCnt)
182+
183+
req.lock.RLock()
180184
if deliveredCnt == len(req.hashes) {
181185
p.logger.Debug("all headers delivered", "count", deliveredCnt)
182186
finishDeliveryFunc(req.hashes)
183-
return
187+
req.lock.RUnlock()
188+
return nil
184189
}
190+
req.lock.RUnlock()
185191
case <-ctx.Done():
186192
p.logger.Debug("bzzeth.handleNewBlockHeaders", "delivered", deliveredCnt, "err", err)
187-
return
193+
return nil
188194
}
189195
}
190196
}
@@ -220,15 +226,14 @@ func finishDelivery(hashes map[string]bool) {
220226
}
221227

222228
// handleBlockHeaders handles block headers message
223-
func (b *BzzEth) handleBlockHeaders(ctx context.Context, p *Peer, msg *BlockHeaders) {
229+
func (b *BzzEth) handleBlockHeaders(ctx context.Context, p *Peer, msg *BlockHeaders) error {
224230
p.logger.Debug("bzzeth.handleBlockHeaders", "id", msg.Rid)
225231

226232
// retrieve the request for this id
227233
req, ok := p.requests.get(msg.Rid)
228234
if !ok {
229-
p.logger.Warn("bzzeth.handleBlockHeaders: nonexisting request id", "id", msg.Rid)
230-
p.Drop("nonexisting request id")
231-
return
235+
return fmt.Errorf("bzzeth.handleBlockHeaders: nonexisting request id %d", msg.Rid)
236+
232237
}
233238

234239
// convert rlp.RawValue to bytes
@@ -237,11 +242,7 @@ func (b *BzzEth) handleBlockHeaders(ctx context.Context, p *Peer, msg *BlockHead
237242
headers[i] = h
238243
}
239244

240-
err := b.deliverAndStoreAll(ctx, req, headers)
241-
if err != nil {
242-
p.logger.Warn("bzzeth.handleBlockHeaders: fatal dropping peer", "id", msg.Rid, "err", err)
243-
p.Drop("error on deliverAndStoreAll")
244-
}
245+
return b.deliverAndStoreAll(ctx, req, headers)
245246
}
246247

247248
// Validates and headers asynchronously and stores the valid chunks in one go
@@ -263,13 +264,14 @@ func (b *BzzEth) deliverAndStoreAll(ctx context.Context, req *request, headers [
263264
return nil
264265
})
265266
}
266-
// finish storage is used mostly in testing
267-
// in normal scenario.. it just logs Trace
268-
defer finishStorageFunc(chunks)
269267

270268
// wait for all validations to get over and close the channels
271269
err := wg.Wait()
272270

271+
// finish storage is used mostly in testing
272+
// in normal scenario.. it just logs Trace
273+
defer finishStorageFunc(chunks)
274+
273275
// We want to store even if there is any validation error.
274276
// since some headers may be valid in the batch.
275277
// Store all the valid header chunks in one shot
@@ -354,7 +356,7 @@ func arrangeHeader(hashes []chunk.Address, headers []chunk.Address) []chunk.Addr
354356

355357
// handles GetBlockHeader requests, in the protocol handler this call is asynchronous
356358
// so it is safe to have it run until delivery is finished
357-
func (b *BzzEth) handleGetBlockHeaders(ctx context.Context, p *Peer, msg *GetBlockHeaders) {
359+
func (b *BzzEth) handleGetBlockHeaders(ctx context.Context, p *Peer, msg *GetBlockHeaders) error {
358360
p.logger.Debug("bzzeth.handleGetBlockHeaders", "id", msg.Rid)
359361
total := len(msg.Hashes)
360362
ctx, osp := spancontext.StartSpan(ctx, "bzzeth.handleGetBlockHeaders")
@@ -420,6 +422,8 @@ DELIVERY:
420422
}
421423
}
422424
p.logger.Debug("bzzeth.handleGetBlockHeaders: sent all headers", "id", msg.Rid)
425+
426+
return nil
423427
}
424428

425429
var batchWait = 100 * time.Millisecond // time to wait for collecting headers in a batch

network/hive.go

+23-17
Original file line numberDiff line numberDiff line change
@@ -390,24 +390,30 @@ func (h *Hive) handlePeersMsg(d *Peer, msg *peersMsg) error {
390390
func (h *Hive) handleSubPeersMsg(ctx context.Context, d *Peer, msg *subPeersMsg) error {
391391
d.setDepth(msg.Depth)
392392
// only send peers after the initial subPeersMsg
393-
if !d.sentPeers {
394-
var peers []*BzzAddr
395-
// iterate connection in ascending order of disctance from the remote address
396-
h.EachConn(d.Over(), 255, func(p *Peer, po int) bool {
397-
// terminate if we are beyond the radius
398-
if uint8(po) < msg.Depth {
399-
return false
400-
}
401-
if !d.seen(p.BzzAddr) { // here just records the peer sent
402-
peers = append(peers, p.BzzAddr)
403-
}
404-
return true
405-
})
406-
// if useful peers are found, send them over
407-
if len(peers) > 0 {
408-
go d.Send(ctx, &peersMsg{Peers: sortPeers(peers)})
409-
}
393+
h.lock.Lock()
394+
if d.sentPeers {
395+
h.lock.Unlock()
396+
return nil
410397
}
398+
411399
d.sentPeers = true
400+
h.lock.Unlock()
401+
402+
var peers []*BzzAddr
403+
// iterate connection in ascending order of distance from the remote address
404+
h.EachConn(d.Over(), 255, func(p *Peer, po int) bool {
405+
// terminate if we are beyond the radius
406+
if uint8(po) < msg.Depth {
407+
return false
408+
}
409+
if !d.seen(p.BzzAddr) { // here just records the peer sent
410+
peers = append(peers, p.BzzAddr)
411+
}
412+
return true
413+
})
414+
// if useful peers are found, send them over
415+
if len(peers) > 0 {
416+
go d.Send(ctx, &peersMsg{Peers: sortPeers(peers)})
417+
}
412418
return nil
413419
}

network/retrieval/retrieve.go

+14-12
Original file line numberDiff line numberDiff line change
@@ -157,11 +157,9 @@ func (r *Retrieval) handleMsg(p *Peer) func(context.Context, interface{}) error
157157
return func(ctx context.Context, msg interface{}) error {
158158
switch msg := msg.(type) {
159159
case *RetrieveRequest:
160-
// we must handle them in a different goroutine otherwise parallel requests
161-
// for other chunks from the same peer will get stuck in the queue
162-
go r.handleRetrieveRequest(ctx, p, msg)
160+
return r.handleRetrieveRequest(ctx, p, msg)
163161
case *ChunkDelivery:
164-
go r.handleChunkDelivery(ctx, p, msg)
162+
return r.handleChunkDelivery(ctx, p, msg)
165163
}
166164
return nil
167165
}
@@ -296,7 +294,7 @@ func (r *Retrieval) findPeerLB(ctx context.Context, req *storage.Request) (retPe
296294
// handleRetrieveRequest handles an incoming retrieve request from a certain Peer
297295
// if the chunk is found in the localstore it is served immediately, otherwise
298296
// it results in a new retrieve request to candidate peers in our kademlia
299-
func (r *Retrieval) handleRetrieveRequest(ctx context.Context, p *Peer, msg *RetrieveRequest) {
297+
func (r *Retrieval) handleRetrieveRequest(ctx context.Context, p *Peer, msg *RetrieveRequest) error {
300298
p.logger.Debug("retrieval.handleRetrieveRequest", "ref", msg.Addr)
301299
handleRetrieveRequestMsgCount.Inc(1)
302300

@@ -319,7 +317,8 @@ func (r *Retrieval) handleRetrieveRequest(ctx context.Context, p *Peer, msg *Ret
319317
if err != nil {
320318
retrieveChunkFail.Inc(1)
321319
p.logger.Trace("netstore.Get can not retrieve chunk", "ref", msg.Addr, "err", err)
322-
return
320+
// continue in event loop
321+
return nil
323322
}
324323

325324
p.logger.Trace("retrieval.handleRetrieveRequest - delivery", "ref", msg.Addr)
@@ -334,22 +333,23 @@ func (r *Retrieval) handleRetrieveRequest(ctx context.Context, p *Peer, msg *Ret
334333
if err != nil {
335334
p.logger.Error("retrieval.handleRetrieveRequest - peer delivery failed", "ref", msg.Addr, "err", err)
336335
osp.LogFields(olog.Bool("delivered", false))
337-
return
336+
// continue in event loop
337+
return nil
338338
}
339339
osp.LogFields(olog.Bool("delivered", true))
340+
341+
return nil
340342
}
341343

342344
// handleChunkDelivery handles a ChunkDelivery message from a certain peer
343345
// if the chunk proximity order in relation to our base address is within depth
344346
// we treat the chunk as a chunk received in syncing
345-
func (r *Retrieval) handleChunkDelivery(ctx context.Context, p *Peer, msg *ChunkDelivery) {
347+
func (r *Retrieval) handleChunkDelivery(ctx context.Context, p *Peer, msg *ChunkDelivery) error {
346348
p.logger.Debug("retrieval.handleChunkDelivery", "ref", msg.Addr)
347349
err := p.checkRequest(msg.Ruid, msg.Addr)
348350
if err != nil {
349351
unsolicitedChunkDelivery.Inc(1)
350-
p.logger.Error("unsolicited chunk delivery from peer", "ruid", msg.Ruid, "addr", msg.Addr, "err", err)
351-
p.Drop("unsolicited chunk delivery")
352-
return
352+
return fmt.Errorf("unsolicited chunk delivery from peer: %s", err)
353353
}
354354
var osp opentracing.Span
355355
ctx, osp = spancontext.StartSpan(
@@ -380,9 +380,11 @@ func (r *Retrieval) handleChunkDelivery(ctx context.Context, p *Peer, msg *Chunk
380380
if err != nil {
381381
p.logger.Error("netstore error putting chunk to localstore", "err", err)
382382
if err == storage.ErrChunkInvalid {
383-
p.Drop("invalid chunk in netstore put")
383+
return fmt.Errorf("netstore error putting chunk to localstore: %s", err)
384384
}
385385
}
386+
387+
return nil
386388
}
387389

388390
// RequestFromPeers sends a chunk retrieve request to the next found peer

network/retrieval/retrieve_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ func TestUnsolicitedChunkDelivery(t *testing.T) {
172172
})
173173

174174
// expect peer disconnection
175-
err = tester.TestDisconnected(&p2ptest.Disconnect{Peer: node.ID(), Error: errors.New("subprotocol error")})
175+
err = tester.TestDisconnected(&p2ptest.Disconnect{Peer: node.ID(), Error: errors.New("Message handler error: (msg code 0): unsolicited chunk delivery from peer: cannot find ruid")})
176176

177177
if err != nil {
178178
t.Fatal(err)
@@ -246,7 +246,7 @@ func TestUnsolicitedChunkDeliveryFaultyAddr(t *testing.T) {
246246
}
247247

248248
// expect disconnection
249-
err = tester.TestDisconnected(&p2ptest.Disconnect{Peer: node.ID(), Error: errors.New("subprotocol error")})
249+
err = tester.TestDisconnected(&p2ptest.Disconnect{Peer: node.ID(), Error: errors.New("Message handler error: (msg code 0): unsolicited chunk delivery from peer: retrieve request found but address does not match")})
250250

251251
if err != nil {
252252
t.Fatal(err)
@@ -334,7 +334,7 @@ func TestUnsolicitedChunkDeliveryDouble(t *testing.T) {
334334
}
335335

336336
// expect disconnection
337-
err = tester.TestDisconnected(&p2ptest.Disconnect{Peer: node.ID(), Error: errors.New("subprotocol error")})
337+
err = tester.TestDisconnected(&p2ptest.Disconnect{Peer: node.ID(), Error: errors.New("Message handler error: (msg code 0): unsolicited chunk delivery from peer: cannot find ruid")})
338338

339339
if err != nil {
340340
t.Fatal(err)

network/stream/common_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -275,15 +275,15 @@ type syncPauser struct {
275275
mu sync.RWMutex
276276
}
277277

278-
func (p *syncPauser) pause() {
278+
func (p *syncPauser) Pause() {
279279
p.mu.Lock()
280280
if p.c == nil {
281281
p.c = sync.NewCond(&p.cMu)
282282
}
283283
p.mu.Unlock()
284284
}
285285

286-
func (p *syncPauser) resume() {
286+
func (p *syncPauser) Resume() {
287287
p.c.L.Lock()
288288
p.c.Broadcast()
289289
p.c.L.Unlock()
@@ -292,7 +292,7 @@ func (p *syncPauser) resume() {
292292
p.mu.Unlock()
293293
}
294294

295-
func (p *syncPauser) wait() {
295+
func (p *syncPauser) Wait() {
296296
p.mu.RLock()
297297
if p.c != nil {
298298
p.c.L.Lock()

0 commit comments

Comments
 (0)