Skip to content

Commit 197722e

Browse files
dirkmchannahhoward
andauthored
RegisterNetworkErrorListener should fire when there's an error connecting to the peer (#127)
* fix: RegisterNetworkErrorListener should fire when there's an error connecting to the peer * feat(listeners): move listeners to top level folder * feat(requestmanager): put network error in request manager Put network error in request manager to cover more cases * feat(requestmanager): correct for notification refactor Resolve discripencies with new and old notification interfaces Co-authored-by: hannahhoward <[email protected]>
1 parent e9653de commit 197722e

File tree

8 files changed

+121
-47
lines changed

8 files changed

+121
-47
lines changed

impl/graphsync.go

+10-9
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"github.com/libp2p/go-libp2p-core/peer"
1010

1111
"github.com/ipfs/go-graphsync"
12+
"github.com/ipfs/go-graphsync/listeners"
1213
gsmsg "github.com/ipfs/go-graphsync/message"
1314
"github.com/ipfs/go-graphsync/messagequeue"
1415
gsnet "github.com/ipfs/go-graphsync/network"
@@ -46,10 +47,10 @@ type GraphSync struct {
4647
incomingRequestHooks *responderhooks.IncomingRequestHooks
4748
outgoingBlockHooks *responderhooks.OutgoingBlockHooks
4849
requestUpdatedHooks *responderhooks.RequestUpdatedHooks
49-
completedResponseListeners *responderhooks.CompletedResponseListeners
50-
requestorCancelledListeners *responderhooks.RequestorCancelledListeners
51-
blockSentListeners *responderhooks.BlockSentListeners
52-
networkErrorListeners *responderhooks.NetworkErrorListeners
50+
completedResponseListeners *listeners.CompletedResponseListeners
51+
requestorCancelledListeners *listeners.RequestorCancelledListeners
52+
blockSentListeners *listeners.BlockSentListeners
53+
networkErrorListeners *listeners.NetworkErrorListeners
5354
incomingResponseHooks *requestorhooks.IncomingResponseHooks
5455
outgoingRequestHooks *requestorhooks.OutgoingRequestHooks
5556
incomingBlockHooks *requestorhooks.IncomingBlockHooks
@@ -113,17 +114,17 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
113114
incomingResponseHooks := requestorhooks.NewResponseHooks()
114115
outgoingRequestHooks := requestorhooks.NewRequestHooks()
115116
incomingBlockHooks := requestorhooks.NewBlockHooks()
116-
requestManager := requestmanager.New(ctx, asyncLoader, outgoingRequestHooks, incomingResponseHooks, incomingBlockHooks)
117+
networkErrorListeners := listeners.NewNetworkErrorListeners()
118+
requestManager := requestmanager.New(ctx, asyncLoader, outgoingRequestHooks, incomingResponseHooks, incomingBlockHooks, networkErrorListeners)
117119
peerTaskQueue := peertaskqueue.New()
118120

119121
persistenceOptions := persistenceoptions.New()
120122
incomingRequestHooks := responderhooks.NewRequestHooks(persistenceOptions)
121123
outgoingBlockHooks := responderhooks.NewBlockHooks()
122124
requestUpdatedHooks := responderhooks.NewUpdateHooks()
123-
completedResponseListeners := responderhooks.NewCompletedResponseListeners()
124-
requestorCancelledListeners := responderhooks.NewRequestorCancelledListeners()
125-
blockSentListeners := responderhooks.NewBlockSentListeners()
126-
networkErrorListeners := responderhooks.NewNetworkErrorListeners()
125+
completedResponseListeners := listeners.NewCompletedResponseListeners()
126+
requestorCancelledListeners := listeners.NewRequestorCancelledListeners()
127+
blockSentListeners := listeners.NewBlockSentListeners()
127128
unregisterDefaultValidator := incomingRequestHooks.Register(selectorvalidator.SelectorValidator(maxRecursionDepth))
128129
graphSync := &GraphSync{
129130
network: network,

impl/graphsync_test.go

+35
Original file line numberDiff line numberDiff line change
@@ -611,6 +611,41 @@ func TestNetworkDisconnect(t *testing.T) {
611611
require.EqualError(t, err, graphsync.RequestContextCancelledErr{}.Error())
612612
}
613613

614+
func TestConnectFail(t *testing.T) {
615+
// create network
616+
ctx := context.Background()
617+
ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
618+
defer cancel()
619+
td := newGsTestData(ctx, t)
620+
621+
// initialize graphsync on first node to make requests
622+
requestor := td.GraphSyncHost1()
623+
624+
blockChainLength := 100
625+
blockChain := testutil.SetupBlockChain(ctx, t, td.loader2, td.storer2, 100, blockChainLength)
626+
627+
requestCtx, requestCancel := context.WithTimeout(ctx, 1*time.Second)
628+
defer requestCancel()
629+
630+
// unlink peers so they cannot communicate
631+
td.mn.DisconnectPeers(td.host1.ID(), td.host2.ID())
632+
td.mn.UnlinkPeers(td.host1.ID(), td.host2.ID())
633+
634+
reqNetworkError := make(chan error, 1)
635+
requestor.RegisterNetworkErrorListener(func(p peer.ID, request graphsync.RequestData, err error) {
636+
select {
637+
case reqNetworkError <- err:
638+
default:
639+
}
640+
})
641+
_, errChan := requestor.Request(requestCtx, td.host2.ID(), blockChain.TipLink, blockChain.Selector(), td.extension)
642+
643+
var err error
644+
testutil.AssertReceive(ctx, t, reqNetworkError, &err, "should receive network error")
645+
testutil.AssertReceive(ctx, t, errChan, &err, "should receive an error")
646+
require.EqualError(t, err, graphsync.RequestContextCancelledErr{}.Error())
647+
}
648+
614649
func TestGraphsyncRoundTripAlternatePersistenceAndNodes(t *testing.T) {
615650
// create network
616651
ctx := context.Background()

responsemanager/hooks/listeners.go renamed to listeners/listeners.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package hooks
1+
package listeners
22

33
import (
44
"github.com/hannahhoward/go-pubsub"

requestmanager/executor/executor.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ import (
1616
"github.com/ipfs/go-graphsync/cidset"
1717
"github.com/ipfs/go-graphsync/ipldutil"
1818
gsmsg "github.com/ipfs/go-graphsync/message"
19-
"github.com/ipfs/go-graphsync/notifications"
2019
"github.com/ipfs/go-graphsync/requestmanager/hooks"
2120
"github.com/ipfs/go-graphsync/requestmanager/types"
2221
)
@@ -28,7 +27,7 @@ type AsyncLoadFn func(graphsync.RequestID, ipld.Link) <-chan types.AsyncLoadResu
2827
// ExecutionEnv are request parameters that last between requests
2928
type ExecutionEnv struct {
3029
Ctx context.Context
31-
SendRequest func(peer.ID, gsmsg.GraphSyncRequest, ...notifications.Notifee)
30+
SendRequest func(peer.ID, gsmsg.GraphSyncRequest)
3231
RunBlockHooks func(p peer.ID, response graphsync.ResponseData, blk graphsync.BlockData) error
3332
TerminateRequest func(graphsync.RequestID)
3433
WaitForMessages func(ctx context.Context, resumeMessages chan graphsync.ExtensionData) ([]graphsync.ExtensionData, error)

requestmanager/executor/executor_test.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ import (
1919
"github.com/ipfs/go-graphsync/cidset"
2020
"github.com/ipfs/go-graphsync/ipldutil"
2121
gsmsg "github.com/ipfs/go-graphsync/message"
22-
"github.com/ipfs/go-graphsync/notifications"
2322
"github.com/ipfs/go-graphsync/requestmanager/executor"
2423
"github.com/ipfs/go-graphsync/requestmanager/hooks"
2524
"github.com/ipfs/go-graphsync/requestmanager/testloader"
@@ -378,7 +377,7 @@ func (ree *requestExecutionEnv) waitForResume() ([]graphsync.ExtensionData, erro
378377
return extensions, nil
379378
}
380379

381-
func (ree *requestExecutionEnv) sendRequest(p peer.ID, request gsmsg.GraphSyncRequest, notifees ...notifications.Notifee) {
380+
func (ree *requestExecutionEnv) sendRequest(p peer.ID, request gsmsg.GraphSyncRequest) {
382381
ree.requestsSent = append(ree.requestsSent, requestSent{p, request})
383382
if ree.currentWaitForResumeResult < len(ree.loaderRanges) && !request.IsCancel() {
384383
ree.configureLoader(ree.p, ree.request.ID(), ree.tbc, ree.fal, ree.loaderRanges[ree.currentWaitForResumeResult])

requestmanager/requestmanager.go

+42-7
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@ import (
66
"fmt"
77
"sync/atomic"
88

9+
"github.com/ipfs/go-graphsync/listeners"
10+
"github.com/ipfs/go-graphsync/messagequeue"
11+
912
blocks "github.com/ipfs/go-block-format"
1013
"github.com/ipfs/go-cid"
1114
logging "github.com/ipfs/go-log"
@@ -74,6 +77,7 @@ type RequestManager struct {
7477
requestHooks RequestHooks
7578
responseHooks ResponseHooks
7679
blockHooks BlockHooks
80+
networkErrorListeners *listeners.NetworkErrorListeners
7781
}
7882

7983
type requestManagerMessage interface {
@@ -100,7 +104,9 @@ func New(ctx context.Context,
100104
asyncLoader AsyncLoader,
101105
requestHooks RequestHooks,
102106
responseHooks ResponseHooks,
103-
blockHooks BlockHooks) *RequestManager {
107+
blockHooks BlockHooks,
108+
networkErrorListeners *listeners.NetworkErrorListeners,
109+
) *RequestManager {
104110
ctx, cancel := context.WithCancel(ctx)
105111
return &RequestManager{
106112
ctx: ctx,
@@ -112,6 +118,7 @@ func New(ctx context.Context,
112118
requestHooks: requestHooks,
113119
responseHooks: responseHooks,
114120
blockHooks: blockHooks,
121+
networkErrorListeners: networkErrorListeners,
115122
}
116123
}
117124

@@ -333,7 +340,7 @@ func (nrm *newRequestMessage) setupRequest(requestID graphsync.RequestID, rm *Re
333340
rm.inProgressRequestStatuses[request.ID()] = requestStatus
334341
incoming, incomingError := executor.ExecutionEnv{
335342
Ctx: rm.ctx,
336-
SendRequest: rm.peerHandler.SendRequest,
343+
SendRequest: rm.sendRequest,
337344
TerminateRequest: rm.terminateRequest,
338345
RunBlockHooks: rm.processBlockHooks,
339346
Loader: rm.asyncLoader.AsyncLoad,
@@ -375,7 +382,7 @@ func (crm *cancelRequestMessage) handle(rm *RequestManager) {
375382
return
376383
}
377384

378-
rm.peerHandler.SendRequest(inProgressRequestStatus.p, gsmsg.CancelRequest(crm.requestID))
385+
rm.sendRequest(inProgressRequestStatus.p, gsmsg.CancelRequest(crm.requestID))
379386
if crm.isPause {
380387
inProgressRequestStatus.paused = true
381388
} else {
@@ -425,7 +432,7 @@ func (rm *RequestManager) processExtensionsForResponse(p peer.ID, response gsmsg
425432
result := rm.responseHooks.ProcessResponseHooks(p, response)
426433
if len(result.Extensions) > 0 {
427434
updateRequest := gsmsg.UpdateRequest(response.RequestID(), result.Extensions...)
428-
rm.peerHandler.SendRequest(p, updateRequest)
435+
rm.sendRequest(p, updateRequest)
429436
}
430437
if result.Err != nil {
431438
requestStatus, ok := rm.inProgressRequestStatuses[response.RequestID()]
@@ -437,7 +444,7 @@ func (rm *RequestManager) processExtensionsForResponse(p peer.ID, response gsmsg
437444
case requestStatus.networkError <- responseError:
438445
case <-requestStatus.ctx.Done():
439446
}
440-
rm.peerHandler.SendRequest(p, gsmsg.CancelRequest(response.RequestID()))
447+
rm.sendRequest(p, gsmsg.CancelRequest(response.RequestID()))
441448
requestStatus.cancelFn()
442449
return false
443450
}
@@ -482,7 +489,7 @@ func (rm *RequestManager) processBlockHooks(p peer.ID, response graphsync.Respon
482489
result := rm.blockHooks.ProcessBlockHooks(p, response, block)
483490
if len(result.Extensions) > 0 {
484491
updateRequest := gsmsg.UpdateRequest(response.RequestID(), result.Extensions...)
485-
rm.peerHandler.SendRequest(p, updateRequest)
492+
rm.sendRequest(p, updateRequest)
486493
}
487494
if result.Err != nil {
488495
_, isPause := result.Err.(hooks.ErrPaused)
@@ -535,6 +542,34 @@ func (rm *RequestManager) validateRequest(requestID graphsync.RequestID, p peer.
535542
return request, hooksResult, nil
536543
}
537544

545+
type reqSubscriber struct {
546+
p peer.ID
547+
request gsmsg.GraphSyncRequest
548+
networkErrorListeners *listeners.NetworkErrorListeners
549+
}
550+
551+
func (r *reqSubscriber) OnNext(topic notifications.Topic, event notifications.Event) {
552+
mqEvt, isMQEvt := event.(messagequeue.Event)
553+
if !isMQEvt || mqEvt.Name != messagequeue.Error {
554+
return
555+
}
556+
557+
r.networkErrorListeners.NotifyNetworkErrorListeners(r.p, r.request, mqEvt.Err)
558+
//r.re.networkError <- mqEvt.Err
559+
//r.re.terminateRequest()
560+
}
561+
562+
func (r reqSubscriber) OnClose(topic notifications.Topic) {
563+
}
564+
565+
const requestNetworkError = "request_network_error"
566+
567+
func (rm *RequestManager) sendRequest(p peer.ID, request gsmsg.GraphSyncRequest) {
568+
sub := notifications.NewTopicDataSubscriber(&reqSubscriber{p, request, rm.networkErrorListeners})
569+
failNotifee := notifications.Notifee{Data: requestNetworkError, Subscriber: sub}
570+
rm.peerHandler.SendRequest(p, request, failNotifee)
571+
}
572+
538573
func (urm *unpauseRequestMessage) unpause(rm *RequestManager) error {
539574
inProgressRequestStatus, ok := rm.inProgressRequestStatuses[urm.id]
540575
if !ok {
@@ -546,7 +581,7 @@ func (urm *unpauseRequestMessage) unpause(rm *RequestManager) error {
546581
inProgressRequestStatus.paused = false
547582
select {
548583
case <-inProgressRequestStatus.pauseMessages:
549-
rm.peerHandler.SendRequest(inProgressRequestStatus.p, gsmsg.UpdateRequest(urm.id, urm.extensions...))
584+
rm.sendRequest(inProgressRequestStatus.p, gsmsg.UpdateRequest(urm.id, urm.extensions...))
550585
return nil
551586
case <-rm.ctx.Done():
552587
return errors.New("context cancelled")

requestmanager/requestmanager_test.go

+22-18
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ import (
77
"testing"
88
"time"
99

10+
"github.com/ipfs/go-graphsync/listeners"
11+
1012
blocks "github.com/ipfs/go-block-format"
1113
"github.com/ipld/go-ipld-prime"
1214
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
@@ -858,23 +860,24 @@ func TestPauseResumeExternal(t *testing.T) {
858860
}
859861

860862
type testData struct {
861-
requestRecordChan chan requestRecord
862-
fph *fakePeerHandler
863-
fal *testloader.FakeAsyncLoader
864-
requestHooks *hooks.OutgoingRequestHooks
865-
responseHooks *hooks.IncomingResponseHooks
866-
blockHooks *hooks.IncomingBlockHooks
867-
requestManager *RequestManager
868-
blockStore map[ipld.Link][]byte
869-
loader ipld.Loader
870-
storer ipld.Storer
871-
blockChain *testutil.TestBlockChain
872-
extensionName1 graphsync.ExtensionName
873-
extensionData1 []byte
874-
extension1 graphsync.ExtensionData
875-
extensionName2 graphsync.ExtensionName
876-
extensionData2 []byte
877-
extension2 graphsync.ExtensionData
863+
requestRecordChan chan requestRecord
864+
fph *fakePeerHandler
865+
fal *testloader.FakeAsyncLoader
866+
requestHooks *hooks.OutgoingRequestHooks
867+
responseHooks *hooks.IncomingResponseHooks
868+
blockHooks *hooks.IncomingBlockHooks
869+
requestManager *RequestManager
870+
blockStore map[ipld.Link][]byte
871+
loader ipld.Loader
872+
storer ipld.Storer
873+
blockChain *testutil.TestBlockChain
874+
extensionName1 graphsync.ExtensionName
875+
extensionData1 []byte
876+
extension1 graphsync.ExtensionData
877+
extensionName2 graphsync.ExtensionName
878+
extensionData2 []byte
879+
extension2 graphsync.ExtensionData
880+
networkErrorListeners *listeners.NetworkErrorListeners
878881
}
879882

880883
func newTestData(ctx context.Context, t *testing.T) *testData {
@@ -885,7 +888,8 @@ func newTestData(ctx context.Context, t *testing.T) *testData {
885888
td.requestHooks = hooks.NewRequestHooks()
886889
td.responseHooks = hooks.NewResponseHooks()
887890
td.blockHooks = hooks.NewBlockHooks()
888-
td.requestManager = New(ctx, td.fal, td.requestHooks, td.responseHooks, td.blockHooks)
891+
td.networkErrorListeners = listeners.NewNetworkErrorListeners()
892+
td.requestManager = New(ctx, td.fal, td.requestHooks, td.responseHooks, td.blockHooks, td.networkErrorListeners)
889893
td.requestManager.SetDelegate(td.fph)
890894
td.requestManager.Startup()
891895
td.blockStore = make(map[ipld.Link][]byte)

responsemanager/responsemanager_test.go

+9-8
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"github.com/ipfs/go-graphsync"
2121
"github.com/ipfs/go-graphsync/cidset"
2222
"github.com/ipfs/go-graphsync/dedupkey"
23+
"github.com/ipfs/go-graphsync/listeners"
2324
gsmsg "github.com/ipfs/go-graphsync/message"
2425
"github.com/ipfs/go-graphsync/notifications"
2526
"github.com/ipfs/go-graphsync/responsemanager/hooks"
@@ -886,10 +887,10 @@ type testData struct {
886887
requestHooks *hooks.IncomingRequestHooks
887888
blockHooks *hooks.OutgoingBlockHooks
888889
updateHooks *hooks.RequestUpdatedHooks
889-
completedListeners *hooks.CompletedResponseListeners
890-
cancelledListeners *hooks.RequestorCancelledListeners
891-
blockSentListeners *hooks.BlockSentListeners
892-
networkErrorListeners *hooks.NetworkErrorListeners
890+
completedListeners *listeners.CompletedResponseListeners
891+
cancelledListeners *listeners.RequestorCancelledListeners
892+
blockSentListeners *listeners.BlockSentListeners
893+
networkErrorListeners *listeners.NetworkErrorListeners
893894
notifeePublisher *testutil.MockPublisher
894895
blockSends chan graphsync.BlockData
895896
completedResponseStatuses chan graphsync.ResponseStatusCode
@@ -960,10 +961,10 @@ func newTestData(t *testing.T) testData {
960961
td.requestHooks = hooks.NewRequestHooks(td.peristenceOptions)
961962
td.blockHooks = hooks.NewBlockHooks()
962963
td.updateHooks = hooks.NewUpdateHooks()
963-
td.completedListeners = hooks.NewCompletedResponseListeners()
964-
td.cancelledListeners = hooks.NewRequestorCancelledListeners()
965-
td.blockSentListeners = hooks.NewBlockSentListeners()
966-
td.networkErrorListeners = hooks.NewNetworkErrorListeners()
964+
td.completedListeners = listeners.NewCompletedResponseListeners()
965+
td.cancelledListeners = listeners.NewRequestorCancelledListeners()
966+
td.blockSentListeners = listeners.NewBlockSentListeners()
967+
td.networkErrorListeners = listeners.NewNetworkErrorListeners()
967968
td.completedListeners.Register(func(p peer.ID, requestID graphsync.RequestData, status graphsync.ResponseStatusCode) {
968969
select {
969970
case td.completedResponseStatuses <- status:

0 commit comments

Comments
 (0)