Skip to content

Commit 45a30f9

Browse files
Send AppErrors from p2p SDK (#2753)
Signed-off-by: Joshua Kim <[email protected]> Co-authored-by: Stephen Buttolph <[email protected]>
1 parent 0bc4617 commit 45a30f9

File tree

13 files changed

+174
-46
lines changed

13 files changed

+174
-46
lines changed

go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ require (
1010
github.com/DataDog/zstd v1.5.2
1111
github.com/NYTimes/gziphandler v1.1.1
1212
github.com/antithesishq/antithesis-sdk-go v0.3.8
13-
github.com/ava-labs/coreth v0.13.6-rc.1
13+
github.com/ava-labs/coreth v0.13.6-rc.1.0.20240702201359-ba2ce5367874
1414
github.com/ava-labs/ledger-avalanche/go v0.0.0-20240610153809-9c955cc90a95
1515
github.com/btcsuite/btcd/btcutil v1.1.3
1616
github.com/cockroachdb/pebble v0.0.0-20230928194634-aa077af62593

go.sum

+2-2
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,8 @@ github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156/go.mod h1:Cb/ax
6262
github.com/antithesishq/antithesis-sdk-go v0.3.8 h1:OvGoHxIcOXFJLyn9IJQ5DzByZ3YVAWNBc394ObzDRb8=
6363
github.com/antithesishq/antithesis-sdk-go v0.3.8/go.mod h1:IUpT2DPAKh6i/YhSbt6Gl3v2yvUZjmKncl7U91fup7E=
6464
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
65-
github.com/ava-labs/coreth v0.13.6-rc.1 h1:gRXRokmu0WOlPqyx+mTLWB655e8/w++u6qFcq9Mo7qA=
66-
github.com/ava-labs/coreth v0.13.6-rc.1/go.mod h1:vm9T8qzP7RLo/jR2MKkliPfaiGgWeEpu/PG6fvvPmog=
65+
github.com/ava-labs/coreth v0.13.6-rc.1.0.20240702201359-ba2ce5367874 h1:aTDg0jvO07EvUvBYebmLO25bffe1DAaZZPPL0ooGhIA=
66+
github.com/ava-labs/coreth v0.13.6-rc.1.0.20240702201359-ba2ce5367874/go.mod h1:VhNDxZBsqZQQaUTmIkzdyY8UicIsoTDXlRmPaPL9lkA=
6767
github.com/ava-labs/ledger-avalanche/go v0.0.0-20240610153809-9c955cc90a95 h1:dOVbtdnZL++pENdTCNZ1nu41eYDQkTML4sWebDnnq8c=
6868
github.com/ava-labs/ledger-avalanche/go v0.0.0-20240610153809-9c955cc90a95/go.mod h1:pJxaT9bUgeRNVmNRgtCHb7sFDIRKy7CzTQVi8gGNT6g=
6969
github.com/aymerick/raymond v2.0.3-0.20180322193309-b565731e1464+incompatible/go.mod h1:osfaiScAUVup+UC9Nfq76eWqDhXlp+4UYaA8uhTBO6g=

network/p2p/error.go

+33
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved.
2+
// See the file LICENSE for licensing terms.
3+
4+
package p2p
5+
6+
import "github.com/ava-labs/avalanchego/snow/engine/common"
7+
8+
var (
9+
// ErrUnexpected should be used to indicate that a request failed due to a
10+
// generic error
11+
ErrUnexpected = &common.AppError{
12+
Code: -1,
13+
Message: "unexpected error",
14+
}
15+
// ErrUnregisteredHandler should be used to indicate that a request failed
16+
// due to it not matching a registered handler
17+
ErrUnregisteredHandler = &common.AppError{
18+
Code: -2,
19+
Message: "unregistered handler",
20+
}
21+
// ErrNotValidator should be used to indicate that a request failed due to
22+
// the requesting peer not being a validator
23+
ErrNotValidator = &common.AppError{
24+
Code: -3,
25+
Message: "not a validator",
26+
}
27+
// ErrThrottled should be used to indicate that a request failed due to the
28+
// requesting peer exceeding a rate limit
29+
ErrThrottled = &common.AppError{
30+
Code: -4,
31+
Message: "throttled",
32+
}
33+
)

network/p2p/gossip/handler.go

+11-5
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111

1212
"github.com/ava-labs/avalanchego/ids"
1313
"github.com/ava-labs/avalanchego/network/p2p"
14+
"github.com/ava-labs/avalanchego/snow/engine/common"
1415
"github.com/ava-labs/avalanchego/utils/bloom"
1516
"github.com/ava-labs/avalanchego/utils/logging"
1617
)
@@ -43,10 +44,10 @@ type Handler[T Gossipable] struct {
4344
targetResponseSize int
4445
}
4546

46-
func (h Handler[T]) AppRequest(_ context.Context, _ ids.NodeID, _ time.Time, requestBytes []byte) ([]byte, error) {
47+
func (h Handler[T]) AppRequest(_ context.Context, _ ids.NodeID, _ time.Time, requestBytes []byte) ([]byte, *common.AppError) {
4748
filter, salt, err := ParseAppRequest(requestBytes)
4849
if err != nil {
49-
return nil, err
50+
return nil, p2p.ErrUnexpected
5051
}
5152

5253
responseSize := 0
@@ -73,14 +74,19 @@ func (h Handler[T]) AppRequest(_ context.Context, _ ids.NodeID, _ time.Time, req
7374
return responseSize <= h.targetResponseSize
7475
})
7576
if err != nil {
76-
return nil, err
77+
return nil, p2p.ErrUnexpected
7778
}
7879

7980
if err := h.metrics.observeMessage(sentPullLabels, len(gossipBytes), responseSize); err != nil {
80-
return nil, err
81+
return nil, p2p.ErrUnexpected
8182
}
8283

83-
return MarshalAppResponse(gossipBytes)
84+
response, err := MarshalAppResponse(gossipBytes)
85+
if err != nil {
86+
return nil, p2p.ErrUnexpected
87+
}
88+
89+
return response, nil
8490
}
8591

8692
func (h Handler[_]) AppGossip(_ context.Context, nodeID ids.NodeID, gossipBytes []byte) {

network/p2p/handler.go

+8-10
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ package p2p
55

66
import (
77
"context"
8-
"errors"
98
"time"
109

1110
"go.uber.org/zap"
@@ -17,8 +16,6 @@ import (
1716
)
1817

1918
var (
20-
ErrNotValidator = errors.New("not a validator")
21-
2219
_ Handler = (*NoOpHandler)(nil)
2320
_ Handler = (*TestHandler)(nil)
2421
_ Handler = (*ValidatorHandler)(nil)
@@ -33,13 +30,14 @@ type Handler interface {
3330
gossipBytes []byte,
3431
)
3532
// AppRequest is called when handling an AppRequest message.
36-
// Returns the bytes for the response corresponding to [requestBytes]
33+
// Sends a response with the response corresponding to [requestBytes] or
34+
// an application-defined error.
3735
AppRequest(
3836
ctx context.Context,
3937
nodeID ids.NodeID,
4038
deadline time.Time,
4139
requestBytes []byte,
42-
) ([]byte, error)
40+
) ([]byte, *common.AppError)
4341
// CrossChainAppRequest is called when handling a CrossChainAppRequest
4442
// message.
4543
// Returns the bytes for the response corresponding to [requestBytes]
@@ -56,7 +54,7 @@ type NoOpHandler struct{}
5654

5755
func (NoOpHandler) AppGossip(context.Context, ids.NodeID, []byte) {}
5856

59-
func (NoOpHandler) AppRequest(context.Context, ids.NodeID, time.Time, []byte) ([]byte, error) {
57+
func (NoOpHandler) AppRequest(context.Context, ids.NodeID, time.Time, []byte) ([]byte, *common.AppError) {
6058
return nil, nil
6159
}
6260

@@ -95,7 +93,7 @@ func (v ValidatorHandler) AppGossip(ctx context.Context, nodeID ids.NodeID, goss
9593
v.handler.AppGossip(ctx, nodeID, gossipBytes)
9694
}
9795

98-
func (v ValidatorHandler) AppRequest(ctx context.Context, nodeID ids.NodeID, deadline time.Time, requestBytes []byte) ([]byte, error) {
96+
func (v ValidatorHandler) AppRequest(ctx context.Context, nodeID ids.NodeID, deadline time.Time, requestBytes []byte) ([]byte, *common.AppError) {
9997
if !v.validatorSet.Has(ctx, nodeID) {
10098
return nil, ErrNotValidator
10199
}
@@ -128,7 +126,7 @@ func (r *responder) AppRequest(ctx context.Context, nodeID ids.NodeID, requestID
128126
zap.Binary("message", request),
129127
zap.Error(err),
130128
)
131-
return nil
129+
return r.sender.SendAppError(ctx, nodeID, requestID, err.Code, err.Message)
132130
}
133131

134132
return r.sender.SendAppResponse(ctx, nodeID, requestID, appResponse)
@@ -155,7 +153,7 @@ func (r *responder) CrossChainAppRequest(ctx context.Context, chainID ids.ID, re
155153

156154
type TestHandler struct {
157155
AppGossipF func(ctx context.Context, nodeID ids.NodeID, gossipBytes []byte)
158-
AppRequestF func(ctx context.Context, nodeID ids.NodeID, deadline time.Time, requestBytes []byte) ([]byte, error)
156+
AppRequestF func(ctx context.Context, nodeID ids.NodeID, deadline time.Time, requestBytes []byte) ([]byte, *common.AppError)
159157
CrossChainAppRequestF func(ctx context.Context, chainID ids.ID, deadline time.Time, requestBytes []byte) ([]byte, error)
160158
}
161159

@@ -167,7 +165,7 @@ func (t TestHandler) AppGossip(ctx context.Context, nodeID ids.NodeID, gossipByt
167165
t.AppGossipF(ctx, nodeID, gossipBytes)
168166
}
169167

170-
func (t TestHandler) AppRequest(ctx context.Context, nodeID ids.NodeID, deadline time.Time, requestBytes []byte) ([]byte, error) {
168+
func (t TestHandler) AppRequest(ctx context.Context, nodeID ids.NodeID, deadline time.Time, requestBytes []byte) ([]byte, *common.AppError) {
171169
if t.AppRequestF == nil {
172170
return nil, nil
173171
}

network/p2p/handler_test.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"github.com/stretchr/testify/require"
1212

1313
"github.com/ava-labs/avalanchego/ids"
14+
"github.com/ava-labs/avalanchego/snow/engine/common"
1415
"github.com/ava-labs/avalanchego/utils/logging"
1516
"github.com/ava-labs/avalanchego/utils/set"
1617
)
@@ -79,7 +80,7 @@ func TestValidatorHandlerAppRequest(t *testing.T) {
7980
name string
8081
validatorSet ValidatorSet
8182
nodeID ids.NodeID
82-
expected error
83+
expected *common.AppError
8384
}{
8485
{
8586
name: "message dropped",

network/p2p/network_test.go

+96-12
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ func TestMessageRouting(t *testing.T) {
4343
require.Equal(wantNodeID, nodeID)
4444
require.Equal(wantMsg, msg)
4545
},
46-
AppRequestF: func(_ context.Context, nodeID ids.NodeID, _ time.Time, msg []byte) ([]byte, error) {
46+
AppRequestF: func(_ context.Context, nodeID ids.NodeID, _ time.Time, msg []byte) ([]byte, *common.AppError) {
4747
appRequestCalled = true
4848
require.Equal(wantNodeID, nodeID)
4949
require.Equal(wantMsg, msg)
@@ -352,7 +352,7 @@ func TestCrossChainAppRequestFailed(t *testing.T) {
352352
}
353353

354354
// Messages for unregistered handlers should be dropped gracefully
355-
func TestMessageForUnregisteredHandler(t *testing.T) {
355+
func TestAppGossipMessageForUnregisteredHandler(t *testing.T) {
356356
tests := []struct {
357357
name string
358358
msg []byte
@@ -379,26 +379,110 @@ func TestMessageForUnregisteredHandler(t *testing.T) {
379379
AppGossipF: func(context.Context, ids.NodeID, []byte) {
380380
require.Fail("should not be called")
381381
},
382-
AppRequestF: func(context.Context, ids.NodeID, time.Time, []byte) ([]byte, error) {
383-
require.Fail("should not be called")
384-
return nil, nil
385-
},
386-
CrossChainAppRequestF: func(context.Context, ids.ID, time.Time, []byte) ([]byte, error) {
382+
}
383+
network, err := NewNetwork(logging.NoLog{}, nil, prometheus.NewRegistry(), "")
384+
require.NoError(err)
385+
require.NoError(network.AddHandler(handlerID, handler))
386+
require.NoError(network.AppGossip(ctx, ids.EmptyNodeID, tt.msg))
387+
})
388+
}
389+
}
390+
391+
// An unregistered handler should gracefully drop messages by responding
392+
// to the requester with a common.AppError
393+
func TestAppRequestMessageForUnregisteredHandler(t *testing.T) {
394+
tests := []struct {
395+
name string
396+
msg []byte
397+
}{
398+
{
399+
name: "nil",
400+
msg: nil,
401+
},
402+
{
403+
name: "empty",
404+
msg: []byte{},
405+
},
406+
{
407+
name: "non-empty",
408+
msg: []byte("foobar"),
409+
},
410+
}
411+
412+
for _, tt := range tests {
413+
t.Run(tt.name, func(t *testing.T) {
414+
require := require.New(t)
415+
ctx := context.Background()
416+
handler := &TestHandler{
417+
AppRequestF: func(context.Context, ids.NodeID, time.Time, []byte) ([]byte, *common.AppError) {
387418
require.Fail("should not be called")
388419
return nil, nil
389420
},
390421
}
391-
network, err := NewNetwork(logging.NoLog{}, nil, prometheus.NewRegistry(), "")
422+
423+
wantNodeID := ids.GenerateTestNodeID()
424+
wantRequestID := uint32(111)
425+
426+
done := make(chan struct{})
427+
sender := &common.SenderTest{}
428+
sender.SendAppErrorF = func(_ context.Context, nodeID ids.NodeID, requestID uint32, errorCode int32, errorMessage string) error {
429+
defer close(done)
430+
431+
require.Equal(wantNodeID, nodeID)
432+
require.Equal(wantRequestID, requestID)
433+
require.Equal(ErrUnregisteredHandler.Code, errorCode)
434+
require.Equal(ErrUnregisteredHandler.Message, errorMessage)
435+
436+
return nil
437+
}
438+
network, err := NewNetwork(logging.NoLog{}, sender, prometheus.NewRegistry(), "")
392439
require.NoError(err)
393440
require.NoError(network.AddHandler(handlerID, handler))
394441

395-
require.NoError(network.AppRequest(ctx, ids.EmptyNodeID, 0, time.Time{}, tt.msg))
396-
require.NoError(network.AppGossip(ctx, ids.EmptyNodeID, tt.msg))
397-
require.NoError(network.CrossChainAppRequest(ctx, ids.Empty, 0, time.Time{}, tt.msg))
442+
require.NoError(network.AppRequest(ctx, wantNodeID, wantRequestID, time.Time{}, tt.msg))
443+
<-done
398444
})
399445
}
400446
}
401447

448+
// A handler that errors should send an AppError to the requesting peer
449+
func TestAppError(t *testing.T) {
450+
require := require.New(t)
451+
ctx := context.Background()
452+
appError := &common.AppError{
453+
Code: 123,
454+
Message: "foo",
455+
}
456+
handler := &TestHandler{
457+
AppRequestF: func(context.Context, ids.NodeID, time.Time, []byte) ([]byte, *common.AppError) {
458+
return nil, appError
459+
},
460+
}
461+
462+
wantNodeID := ids.GenerateTestNodeID()
463+
wantRequestID := uint32(111)
464+
465+
done := make(chan struct{})
466+
sender := &common.SenderTest{}
467+
sender.SendAppErrorF = func(_ context.Context, nodeID ids.NodeID, requestID uint32, errorCode int32, errorMessage string) error {
468+
defer close(done)
469+
470+
require.Equal(wantNodeID, nodeID)
471+
require.Equal(wantRequestID, requestID)
472+
require.Equal(appError.Code, errorCode)
473+
require.Equal(appError.Message, errorMessage)
474+
475+
return nil
476+
}
477+
network, err := NewNetwork(logging.NoLog{}, sender, prometheus.NewRegistry(), "")
478+
require.NoError(err)
479+
require.NoError(network.AddHandler(handlerID, handler))
480+
msg := PrefixMessage(ProtocolPrefix(handlerID), []byte("message"))
481+
482+
require.NoError(network.AppRequest(ctx, wantNodeID, wantRequestID, time.Time{}, msg))
483+
<-done
484+
}
485+
402486
// A response or timeout for a request we never made should return an error
403487
func TestResponseForUnrequestedRequest(t *testing.T) {
404488
tests := []struct {
@@ -427,7 +511,7 @@ func TestResponseForUnrequestedRequest(t *testing.T) {
427511
AppGossipF: func(context.Context, ids.NodeID, []byte) {
428512
require.Fail("should not be called")
429513
},
430-
AppRequestF: func(context.Context, ids.NodeID, time.Time, []byte) ([]byte, error) {
514+
AppRequestF: func(context.Context, ids.NodeID, time.Time, []byte) ([]byte, *common.AppError) {
431515
require.Fail("should not be called")
432516
return nil, nil
433517
},

network/p2p/router.go

+8-4
Original file line numberDiff line numberDiff line change
@@ -128,14 +128,18 @@ func (r *router) AppRequest(ctx context.Context, nodeID ids.NodeID, requestID ui
128128
start := time.Now()
129129
parsedMsg, handler, handlerID, ok := r.parse(request)
130130
if !ok {
131-
r.log.Debug("failed to process message",
131+
r.log.Debug("received message for unregistered handler",
132132
zap.Stringer("messageOp", message.AppRequestOp),
133133
zap.Stringer("nodeID", nodeID),
134134
zap.Uint32("requestID", requestID),
135135
zap.Time("deadline", deadline),
136136
zap.Binary("message", request),
137137
)
138-
return nil
138+
139+
// Send an error back to the requesting peer. Invalid requests that we
140+
// cannot parse a handler id for are handled the same way as requests
141+
// for which we do not have a registered handler.
142+
return r.sender.SendAppError(ctx, nodeID, requestID, ErrUnregisteredHandler.Code, ErrUnregisteredHandler.Message)
139143
}
140144

141145
// call the corresponding handler and send back a response to nodeID
@@ -209,7 +213,7 @@ func (r *router) AppGossip(ctx context.Context, nodeID ids.NodeID, gossip []byte
209213
start := time.Now()
210214
parsedMsg, handler, handlerID, ok := r.parse(gossip)
211215
if !ok {
212-
r.log.Debug("failed to process message",
216+
r.log.Debug("received message for unregistered handler",
213217
zap.Stringer("messageOp", message.AppGossipOp),
214218
zap.Stringer("nodeID", nodeID),
215219
zap.Binary("message", gossip),
@@ -244,7 +248,7 @@ func (r *router) CrossChainAppRequest(
244248
start := time.Now()
245249
parsedMsg, handler, handlerID, ok := r.parse(msg)
246250
if !ok {
247-
r.log.Debug("failed to process message",
251+
r.log.Debug("received message for unregistered handler",
248252
zap.Stringer("messageOp", message.CrossChainAppRequestOp),
249253
zap.Stringer("chainID", chainID),
250254
zap.Uint32("requestID", requestID),

0 commit comments

Comments
 (0)