Skip to content
This repository was archived by the owner on Aug 2, 2021. It is now read-only.

Commit 07d6bd5

Browse files
authored
Protocol async errors refactor (#2062)
bzzeth, network, swap, pss, p2p: refactor of errors usage by p2p.protocol
1 parent 1e01aaa commit 07d6bd5

File tree

14 files changed

+293
-378
lines changed

14 files changed

+293
-378
lines changed

bzzeth/bzzeth.go

+11-12
Original file line numberDiff line numberDiff line change
@@ -117,8 +117,7 @@ func (b *BzzEth) handleMsg(p *Peer) func(context.Context, interface{}) error {
117117
// If any message is received in this case, the peer needs to be dropped
118118
func (b *BzzEth) handleMsgFromSwarmNode(p *Peer) func(context.Context, interface{}) error {
119119
return func(ctx context.Context, msg interface{}) error {
120-
p.logger.Warn("bzzeth.handleMsgFromSwarmNode")
121-
return errRcvdMsgFromSwarmNode
120+
return protocols.Break(errRcvdMsgFromSwarmNode)
122121
}
123122
}
124123

@@ -135,8 +134,7 @@ func (b *BzzEth) handleNewBlockHeaders(ctx context.Context, p *Peer, msg *NewBlo
135134
}
136135
yes, err := b.netStore.Store.HasMulti(ctx, addresses...)
137136
if err != nil {
138-
log.Error("Error checking hashesh in store", "Reason", err)
139-
return nil
137+
return fmt.Errorf("checking hashesh in store: %w", err)
140138
}
141139

142140
// collect the hashes of block headers we want
@@ -160,8 +158,7 @@ func (b *BzzEth) handleNewBlockHeaders(ctx context.Context, p *Peer, msg *NewBlo
160158
deliveries := make(chan []byte)
161159
req, err := p.getBlockHeaders(ctx, hashes, deliveries)
162160
if err != nil {
163-
p.logger.Error("Error sending GetBlockHeader message", "Reason", err)
164-
return nil
161+
return fmt.Errorf("sending GetBlockHeader message: %w", err)
165162
}
166163
defer req.cancel()
167164

@@ -173,7 +170,6 @@ func (b *BzzEth) handleNewBlockHeaders(ctx context.Context, p *Peer, msg *NewBlo
173170
case hdr, ok := <-deliveries:
174171
if !ok {
175172
p.logger.Debug("bzzeth.handleNewBlockHeaders", "delivered", deliveredCnt)
176-
// todo: introduce better errors
177173
return nil
178174
}
179175
ch := newChunk(hdr)
@@ -232,8 +228,7 @@ func (b *BzzEth) handleBlockHeaders(ctx context.Context, p *Peer, msg *BlockHead
232228
// retrieve the request for this id
233229
req, ok := p.requests.get(msg.Rid)
234230
if !ok {
235-
return fmt.Errorf("bzzeth.handleBlockHeaders: nonexisting request id %d", msg.Rid)
236-
231+
return protocols.Break(fmt.Errorf("bzzeth.handleBlockHeaders: nonexisting request id %d", msg.Rid))
237232
}
238233

239234
// convert rlp.RawValue to bytes
@@ -242,7 +237,11 @@ func (b *BzzEth) handleBlockHeaders(ctx context.Context, p *Peer, msg *BlockHead
242237
headers[i] = h
243238
}
244239

245-
return b.deliverAndStoreAll(ctx, req, headers)
240+
if err := b.deliverAndStoreAll(ctx, req, headers); err != nil {
241+
return protocols.Break(err)
242+
}
243+
244+
return nil
246245
}
247246

248247
// Validates and headers asynchronously and stores the valid chunks in one go
@@ -418,11 +417,11 @@ DELIVERY:
418417
if err == nil && deliveredCnt < total {
419418
err := p.Send(ctx, &BlockHeaders{Rid: uint32(msg.Rid)})
420419
if err != nil {
421-
p.logger.Error("could not send empty BlockHeader", "err", err)
420+
return fmt.Errorf("could not send empty BlockHeader: %w", err)
422421
}
423422
}
424-
p.logger.Debug("bzzeth.handleGetBlockHeaders: sent all headers", "id", msg.Rid)
425423

424+
p.logger.Debug("bzzeth.handleGetBlockHeaders: sent all headers", "id", msg.Rid)
426425
return nil
427426
}
428427

network/protocol_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -279,7 +279,7 @@ func TestBzzHandshakeNetworkIDMismatch(t *testing.T) {
279279
err = s.testHandshake(
280280
correctBzzHandshake(s.addr, lightNode),
281281
newBzzHandshakeMsg(TestProtocolVersion, 321, NewBzzAddrFromEnode(node), false),
282-
&p2ptest.Disconnect{Peer: node.ID(), Error: fmt.Errorf("Handshake error: Message handler error: (msg code 0): network id mismatch 321 (!= %v)", TestProtocolNetworkID)},
282+
&p2ptest.Disconnect{Peer: node.ID(), Error: fmt.Errorf("message handler: (msg code 0): network id mismatch 321 (!= %v)", TestProtocolNetworkID)},
283283
)
284284

285285
if err != nil {
@@ -303,7 +303,7 @@ func TestBzzHandshakeVersionMismatch(t *testing.T) {
303303
err = s.testHandshake(
304304
correctBzzHandshake(s.addr, lightNode),
305305
newBzzHandshakeMsg(0, TestProtocolNetworkID, NewBzzAddrFromEnode(node), false),
306-
&p2ptest.Disconnect{Peer: node.ID(), Error: fmt.Errorf("Handshake error: Message handler error: (msg code 0): version mismatch 0 (!= %d)", TestProtocolVersion)},
306+
&p2ptest.Disconnect{Peer: node.ID(), Error: fmt.Errorf("message handler: (msg code 0): version mismatch 0 (!= %d)", TestProtocolVersion)},
307307
)
308308

309309
if err != nil {
@@ -331,7 +331,7 @@ func TestBzzHandshakeInvalidCapabilities(t *testing.T) {
331331
err = s.testHandshake(
332332
correctBzzHandshake(s.addr, lightNode),
333333
msg,
334-
&p2ptest.Disconnect{Peer: node.ID(), Error: fmt.Errorf("Handshake error: Message handler error: (msg code 0): invalid capabilities setting: %s", msg.Addr.Capabilities)},
334+
&p2ptest.Disconnect{Peer: node.ID(), Error: fmt.Errorf("message handler: (msg code 0): invalid capabilities setting: %s", msg.Addr.Capabilities)},
335335
)
336336

337337
if err != nil {

network/retrieval/retrieve.go

+6-10
Original file line numberDiff line numberDiff line change
@@ -316,9 +316,7 @@ func (r *Retrieval) handleRetrieveRequest(ctx context.Context, p *Peer, msg *Ret
316316
chunk, err := r.netStore.Get(ctx, chunk.ModeGetRequest, req)
317317
if err != nil {
318318
retrieveChunkFail.Inc(1)
319-
p.logger.Trace("netstore.Get can not retrieve chunk", "ref", msg.Addr, "err", err)
320-
// continue in event loop
321-
return nil
319+
return fmt.Errorf("netstore.Get can not retrieve chunk for ref %s: %w", msg.Addr, err)
322320
}
323321

324322
p.logger.Trace("retrieval.handleRetrieveRequest - delivery", "ref", msg.Addr)
@@ -331,10 +329,7 @@ func (r *Retrieval) handleRetrieveRequest(ctx context.Context, p *Peer, msg *Ret
331329

332330
err = p.Send(ctx, deliveryMsg)
333331
if err != nil {
334-
p.logger.Error("retrieval.handleRetrieveRequest - peer delivery failed", "ref", msg.Addr, "err", err)
335-
osp.LogFields(olog.Bool("delivered", false))
336-
// continue in event loop
337-
return nil
332+
return fmt.Errorf("retrieval.handleRetrieveRequest - peer delivery for ref %s: %w", msg.Addr, err)
338333
}
339334
osp.LogFields(olog.Bool("delivered", true))
340335

@@ -349,7 +344,7 @@ func (r *Retrieval) handleChunkDelivery(ctx context.Context, p *Peer, msg *Chunk
349344
err := p.checkRequest(msg.Ruid, msg.Addr)
350345
if err != nil {
351346
unsolicitedChunkDelivery.Inc(1)
352-
return fmt.Errorf("unsolicited chunk delivery from peer: %s", err)
347+
return protocols.Break(fmt.Errorf("unsolicited chunk delivery from peer, ruid %d, addr %s: %w", msg.Ruid, msg.Addr, err))
353348
}
354349
var osp opentracing.Span
355350
ctx, osp = spancontext.StartSpan(
@@ -378,10 +373,11 @@ func (r *Retrieval) handleChunkDelivery(ctx context.Context, p *Peer, msg *Chunk
378373

379374
_, err = r.netStore.Put(ctx, mode, storage.NewChunk(msg.Addr, msg.SData))
380375
if err != nil {
381-
p.logger.Error("netstore error putting chunk to localstore", "err", err)
382376
if err == storage.ErrChunkInvalid {
383-
return fmt.Errorf("netstore error putting chunk to localstore: %s", err)
377+
return protocols.Break(fmt.Errorf("netstore putting chunk to localstore: %w", err))
384378
}
379+
380+
return fmt.Errorf("netstore putting chunk to localstore: %w", err)
385381
}
386382

387383
return nil

network/retrieval/retrieve_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ func TestUnsolicitedChunkDelivery(t *testing.T) {
172172
})
173173

174174
// expect peer disconnection
175-
err = tester.TestDisconnected(&p2ptest.Disconnect{Peer: node.ID(), Error: errors.New("Message handler error: (msg code 0): unsolicited chunk delivery from peer: cannot find ruid")})
175+
err = tester.TestDisconnected(&p2ptest.Disconnect{Peer: node.ID(), Error: errors.New("subprotocol error")})
176176

177177
if err != nil {
178178
t.Fatal(err)
@@ -246,7 +246,7 @@ func TestUnsolicitedChunkDeliveryFaultyAddr(t *testing.T) {
246246
}
247247

248248
// expect disconnection
249-
err = tester.TestDisconnected(&p2ptest.Disconnect{Peer: node.ID(), Error: errors.New("Message handler error: (msg code 0): unsolicited chunk delivery from peer: retrieve request found but address does not match")})
249+
err = tester.TestDisconnected(&p2ptest.Disconnect{Peer: node.ID(), Error: errors.New("subprotocol error")})
250250

251251
if err != nil {
252252
t.Fatal(err)
@@ -334,7 +334,7 @@ func TestUnsolicitedChunkDeliveryDouble(t *testing.T) {
334334
}
335335

336336
// expect disconnection
337-
err = tester.TestDisconnected(&p2ptest.Disconnect{Peer: node.ID(), Error: errors.New("Message handler error: (msg code 0): unsolicited chunk delivery from peer: cannot find ruid")})
337+
err = tester.TestDisconnected(&p2ptest.Disconnect{Peer: node.ID(), Error: errors.New("subprotocol error")})
338338

339339
if err != nil {
340340
t.Fatal(err)

network/stream/peer.go

+11-14
Original file line numberDiff line numberDiff line change
@@ -133,32 +133,29 @@ type want struct {
133133
closeC chan error // signal polling goroutine to terminate due to empty batch or timeout
134134
}
135135

136-
// getOfferOrDrop gets on open offer for the requested ruid
137-
// in case the offer is not found - the peer is dropped
138-
func (p *Peer) getOfferOrDrop(ruid uint) (o offer, shouldBreak bool) {
136+
// getOffer gets on open offer for the requested ruid
137+
// in case the offer is not found - error is returned
138+
func (p *Peer) getOffer(ruid uint) (o offer, err error) {
139139
p.mtx.RLock()
140140
o, ok := p.openOffers[ruid]
141141
p.mtx.RUnlock()
142142
if !ok {
143-
p.logger.Error("ruid not found, dropping peer", "ruid", ruid)
144-
p.Drop("ruid not found")
145-
return o, true
143+
return o, fmt.Errorf("ruid not found: %d", ruid)
144+
146145
}
147-
return o, false
146+
return o, nil
148147
}
149148

150-
// getWantOrDrop gets on open want for the requested ruid
151-
// in case the want is not found - the peer is dropped
152-
func (p *Peer) getWantOrDrop(ruid uint) (w *want, shouldBreak bool) {
149+
// getWant gets on open want for the requested ruid
150+
// in case the want is not found the error is returned
151+
func (p *Peer) getWant(ruid uint) (w *want, err error) {
153152
p.mtx.RLock()
154153
w, ok := p.openWants[ruid]
155154
p.mtx.RUnlock()
156155
if !ok {
157-
p.logger.Error("ruid not found, dropping peer", "ruid", ruid)
158-
p.Drop("ruid not found")
159-
return nil, true
156+
return nil, fmt.Errorf("ruid not found: %d", ruid)
160157
}
161-
return w, false
158+
return w, nil
162159
}
163160

164161
func (p *Peer) addInterval(stream ID, start, end uint64) (err error) {

0 commit comments

Comments
 (0)