Skip to content

Commit e66b39d

Browse files
rvaggmvdan
andauthored
feat(graphsync): unify req & resp Pause, Unpause & Cancel by RequestID (#355)
* feat(graphsync): unify req & resp Pause, Unpause & Cancel by RequestID Closes: #349 * fixup! feat(graphsync): unify req & resp Pause, Unpause & Cancel by RequestID * fixup! feat(graphsync): unify req & resp Pause, Unpause & Cancel by RequestID when using error type T, use *T with As, rather than **T * fixup! feat(graphsync): unify req & resp Pause, Unpause & Cancel by RequestID * fixup! feat(graphsync): unify req & resp Pause, Unpause & Cancel by RequestID Co-authored-by: Daniel Martí <[email protected]>
1 parent 259905a commit e66b39d

13 files changed

+171
-175
lines changed

graphsync.go

+6-16
Original file line numberDiff line numberDiff line change
@@ -486,25 +486,15 @@ type GraphExchange interface {
486486
// RegisterReceiverNetworkErrorListener adds a listener for when errors occur receiving data over the wire
487487
RegisterReceiverNetworkErrorListener(listener OnReceiverNetworkErrorListener) UnregisterHookFunc
488488

489-
// UnpauseRequest unpauses a request that was paused in a block hook based request ID
490-
// Can also send extensions with unpause
491-
UnpauseRequest(RequestID, ...ExtensionData) error
492-
493-
// PauseRequest pauses an in progress request (may take 1 or more blocks to process)
494-
PauseRequest(RequestID) error
489+
// Pause pauses an in progress request or response (may take 1 or more blocks to process)
490+
Pause(context.Context, RequestID) error
495491

496-
// UnpauseResponse unpauses a response that was paused in a block hook based on peer ID and request ID
492+
// Unpause unpauses a request or response that was paused
497493
// Can also send extensions with unpause
498-
UnpauseResponse(peer.ID, RequestID, ...ExtensionData) error
499-
500-
// PauseResponse pauses an in progress response (may take 1 or more blocks to process)
501-
PauseResponse(peer.ID, RequestID) error
502-
503-
// CancelResponse cancels an in progress response
504-
CancelResponse(peer.ID, RequestID) error
494+
Unpause(context.Context, RequestID, ...ExtensionData) error
505495

506-
// CancelRequest cancels an in progress request
507-
CancelRequest(context.Context, RequestID) error
496+
// Cancel cancels an in progress request or response
497+
Cancel(context.Context, RequestID) error
508498

509499
// Stats produces insight on the current state of a graphsync exchange
510500
Stats() Stats

impl/graphsync.go

+24-25
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package graphsync
22

33
import (
44
"context"
5+
"errors"
56
"time"
67

78
logging "github.com/ipfs/go-log/v2"
@@ -296,6 +297,7 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
296297
responseManager.Startup()
297298
responseQueue.Startup(gsConfig.maxInProgressIncomingRequests, queryExecutor)
298299
network.SetDelegate((*graphSyncReceiver)(graphSync))
300+
299301
return graphSync
300302
}
301303

@@ -402,35 +404,32 @@ func (gs *GraphSync) RegisterReceiverNetworkErrorListener(listener graphsync.OnR
402404
return gs.receiverErrorListeners.Register(listener)
403405
}
404406

405-
// UnpauseRequest unpauses a request that was paused in a block hook based request ID
406-
// Can also send extensions with unpause
407-
func (gs *GraphSync) UnpauseRequest(requestID graphsync.RequestID, extensions ...graphsync.ExtensionData) error {
408-
return gs.requestManager.UnpauseRequest(requestID, extensions...)
409-
}
410-
411-
// PauseRequest pauses an in progress request (may take 1 or more blocks to process)
412-
func (gs *GraphSync) PauseRequest(requestID graphsync.RequestID) error {
413-
return gs.requestManager.PauseRequest(requestID)
414-
}
415-
416-
// UnpauseResponse unpauses a response that was paused in a block hook based on peer ID and request ID
417-
func (gs *GraphSync) UnpauseResponse(p peer.ID, requestID graphsync.RequestID, extensions ...graphsync.ExtensionData) error {
418-
return gs.responseManager.UnpauseResponse(p, requestID, extensions...)
419-
}
420-
421-
// PauseResponse pauses an in progress response (may take 1 or more blocks to process)
422-
func (gs *GraphSync) PauseResponse(p peer.ID, requestID graphsync.RequestID) error {
423-
return gs.responseManager.PauseResponse(p, requestID)
407+
// Pause pauses an in progress request or response
408+
func (gs *GraphSync) Pause(ctx context.Context, requestID graphsync.RequestID) error {
409+
var reqNotFound graphsync.RequestNotFoundErr
410+
if err := gs.requestManager.PauseRequest(ctx, requestID); !errors.As(err, &reqNotFound) {
411+
return err
412+
}
413+
return gs.responseManager.PauseResponse(ctx, requestID)
424414
}
425415

426-
// CancelResponse cancels an in progress response
427-
func (gs *GraphSync) CancelResponse(p peer.ID, requestID graphsync.RequestID) error {
428-
return gs.responseManager.CancelResponse(p, requestID)
416+
// Unpause unpauses a request or response that was paused
417+
// Can also send extensions with unpause
418+
func (gs *GraphSync) Unpause(ctx context.Context, requestID graphsync.RequestID, extensions ...graphsync.ExtensionData) error {
419+
var reqNotFound graphsync.RequestNotFoundErr
420+
if err := gs.requestManager.UnpauseRequest(ctx, requestID, extensions...); !errors.As(err, &reqNotFound) {
421+
return err
422+
}
423+
return gs.responseManager.UnpauseResponse(ctx, requestID, extensions...)
429424
}
430425

431-
// CancelRequest cancels an in progress request
432-
func (gs *GraphSync) CancelRequest(ctx context.Context, requestID graphsync.RequestID) error {
433-
return gs.requestManager.CancelRequest(ctx, requestID)
426+
// Cancel cancels an in progress request or response
427+
func (gs *GraphSync) Cancel(ctx context.Context, requestID graphsync.RequestID) error {
428+
var reqNotFound graphsync.RequestNotFoundErr
429+
if err := gs.requestManager.CancelRequest(ctx, requestID); !errors.As(err, &reqNotFound) {
430+
return err
431+
}
432+
return gs.responseManager.CancelResponse(ctx, requestID)
434433
}
435434

436435
// Stats produces insight on the current state of a graphsync exchange

impl/graphsync_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -723,7 +723,7 @@ func TestPauseResume(t *testing.T) {
723723
require.Len(t, responderPeerState.IncomingState.Diagnostics(), 0)
724724

725725
requestID := <-requestIDChan
726-
err := responder.UnpauseResponse(td.host1.ID(), requestID)
726+
err := responder.Unpause(ctx, requestID)
727727
require.NoError(t, err)
728728

729729
blockChain.VerifyRemainder(ctx, progressChan, stopPoint)
@@ -793,7 +793,7 @@ func TestPauseResumeRequest(t *testing.T) {
793793
testutil.AssertDoesReceiveFirst(t, timer.C, "should pause request", progressChan)
794794

795795
requestID := <-requestIDChan
796-
err := requestor.UnpauseRequest(requestID, td.extensionUpdate)
796+
err := requestor.Unpause(ctx, requestID, td.extensionUpdate)
797797
require.NoError(t, err)
798798

799799
blockChain.VerifyRemainder(ctx, progressChan, stopPoint)
@@ -1092,7 +1092,7 @@ func TestNetworkDisconnect(t *testing.T) {
10921092
require.NoError(t, td.mn.DisconnectPeers(td.host1.ID(), td.host2.ID()))
10931093
require.NoError(t, td.mn.UnlinkPeers(td.host1.ID(), td.host2.ID()))
10941094
requestID := <-requestIDChan
1095-
err := responder.UnpauseResponse(td.host1.ID(), requestID)
1095+
err := responder.Unpause(ctx, requestID)
10961096
require.NoError(t, err)
10971097

10981098
testutil.AssertReceive(ctx, t, networkError, &err, "should receive network error")

requestmanager/client.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -292,9 +292,9 @@ func (rm *RequestManager) ProcessResponses(p peer.ID,
292292

293293
// UnpauseRequest unpauses a request that was paused in a block hook based request ID
294294
// Can also send extensions with unpause
295-
func (rm *RequestManager) UnpauseRequest(requestID graphsync.RequestID, extensions ...graphsync.ExtensionData) error {
295+
func (rm *RequestManager) UnpauseRequest(ctx context.Context, requestID graphsync.RequestID, extensions ...graphsync.ExtensionData) error {
296296
response := make(chan error, 1)
297-
rm.send(&unpauseRequestMessage{requestID, extensions, response}, nil)
297+
rm.send(&unpauseRequestMessage{requestID, extensions, response}, ctx.Done())
298298
select {
299299
case <-rm.ctx.Done():
300300
return errors.New("context cancelled")
@@ -304,9 +304,9 @@ func (rm *RequestManager) UnpauseRequest(requestID graphsync.RequestID, extensio
304304
}
305305

306306
// PauseRequest pauses an in progress request (may take 1 or more blocks to process)
307-
func (rm *RequestManager) PauseRequest(requestID graphsync.RequestID) error {
307+
func (rm *RequestManager) PauseRequest(ctx context.Context, requestID graphsync.RequestID) error {
308308
response := make(chan error, 1)
309-
rm.send(&pauseRequestMessage{requestID, response}, nil)
309+
rm.send(&pauseRequestMessage{requestID, response}, ctx.Done())
310310
select {
311311
case <-rm.ctx.Done():
312312
return errors.New("context cancelled")

requestmanager/requestmanager_test.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -816,7 +816,7 @@ func TestPauseResume(t *testing.T) {
816816

817817
// attempt to unpause while request is not paused (note: hook on second block will keep it from
818818
// reaching pause point)
819-
err := td.requestManager.UnpauseRequest(rr.gsr.ID())
819+
err := td.requestManager.UnpauseRequest(ctx, rr.gsr.ID())
820820
require.EqualError(t, err, "request is not paused")
821821
close(holdForResumeAttempt)
822822
// verify responses sent read ONLY for blocks BEFORE the pause
@@ -834,7 +834,7 @@ func TestPauseResume(t *testing.T) {
834834
td.fal.CleanupRequest(peers[0], rr.gsr.ID())
835835

836836
// unpause
837-
err = td.requestManager.UnpauseRequest(rr.gsr.ID(), td.extension1, td.extension2)
837+
err = td.requestManager.UnpauseRequest(ctx, rr.gsr.ID(), td.extension1, td.extension2)
838838
require.NoError(t, err)
839839

840840
// verify the correct new request with Do-no-send-cids & other extensions
@@ -875,7 +875,7 @@ func TestPauseResumeExternal(t *testing.T) {
875875
hook := func(p peer.ID, responseData graphsync.ResponseData, blockData graphsync.BlockData, hookActions graphsync.IncomingBlockHookActions) {
876876
blocksReceived++
877877
if blocksReceived == pauseAt {
878-
err := td.requestManager.PauseRequest(responseData.RequestID())
878+
err := td.requestManager.PauseRequest(ctx, responseData.RequestID())
879879
require.NoError(t, err)
880880
close(holdForPause)
881881
}
@@ -909,7 +909,7 @@ func TestPauseResumeExternal(t *testing.T) {
909909
td.fal.CleanupRequest(peers[0], rr.gsr.ID())
910910

911911
// unpause
912-
err := td.requestManager.UnpauseRequest(rr.gsr.ID(), td.extension1, td.extension2)
912+
err := td.requestManager.UnpauseRequest(ctx, rr.gsr.ID(), td.extension1, td.extension2)
913913
require.NoError(t, err)
914914

915915
// verify the correct new request with Do-no-send-cids & other extensions

requestmanager/server.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,7 @@ func (rm *RequestManager) cancelRequest(requestID graphsync.RequestID, onTermina
233233
if !ok {
234234
if onTerminated != nil {
235235
select {
236-
case onTerminated <- graphsync.RequestNotFoundErr{}:
236+
case onTerminated <- &graphsync.RequestNotFoundErr{}:
237237
case <-rm.ctx.Done():
238238
}
239239
}

responsemanager/client.go

+19-23
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ type inProgressResponseStatus struct {
3333
ctx context.Context
3434
span trace.Span
3535
cancelFn func()
36+
peer peer.ID
3637
request gsmsg.GraphSyncRequest
3738
loader ipld.BlockReadOpener
3839
traverser ipldutil.Traverser
@@ -43,11 +44,6 @@ type inProgressResponseStatus struct {
4344
responseStream responseassembler.ResponseStream
4445
}
4546

46-
type responseKey struct {
47-
p peer.ID
48-
requestID graphsync.RequestID
49-
}
50-
5147
// RequestHooks is an interface for processing request hooks
5248
type RequestHooks interface {
5349
ProcessRequestHooks(p peer.ID, request graphsync.RequestData) hooks.RequestResult
@@ -107,7 +103,7 @@ type ResponseManager struct {
107103
blockSentListeners BlockSentListeners
108104
networkErrorListeners NetworkErrorListeners
109105
messages chan responseManagerMessage
110-
inProgressResponses map[responseKey]*inProgressResponseStatus
106+
inProgressResponses map[graphsync.RequestID]*inProgressResponseStatus
111107
connManager network.ConnManager
112108
// maximum number of links to traverse per request. A value of zero = infinity, or no limit
113109
maxLinksPerRequest uint64
@@ -144,7 +140,7 @@ func New(ctx context.Context,
144140
blockSentListeners: blockSentListeners,
145141
networkErrorListeners: networkErrorListeners,
146142
messages: messages,
147-
inProgressResponses: make(map[responseKey]*inProgressResponseStatus),
143+
inProgressResponses: make(map[graphsync.RequestID]*inProgressResponseStatus),
148144
connManager: connManager,
149145
maxLinksPerRequest: maxLinksPerRequest,
150146
responseQueue: responseQueue,
@@ -158,9 +154,9 @@ func (rm *ResponseManager) ProcessRequests(ctx context.Context, p peer.ID, reque
158154
}
159155

160156
// UnpauseResponse unpauses a response that was previously paused
161-
func (rm *ResponseManager) UnpauseResponse(p peer.ID, requestID graphsync.RequestID, extensions ...graphsync.ExtensionData) error {
157+
func (rm *ResponseManager) UnpauseResponse(ctx context.Context, requestID graphsync.RequestID, extensions ...graphsync.ExtensionData) error {
162158
response := make(chan error, 1)
163-
rm.send(&unpauseRequestMessage{p, requestID, response, extensions}, nil)
159+
rm.send(&unpauseRequestMessage{requestID, response, extensions}, ctx.Done())
164160
select {
165161
case <-rm.ctx.Done():
166162
return errors.New("context cancelled")
@@ -170,9 +166,9 @@ func (rm *ResponseManager) UnpauseResponse(p peer.ID, requestID graphsync.Reques
170166
}
171167

172168
// PauseResponse pauses an in progress response (may take 1 or more blocks to process)
173-
func (rm *ResponseManager) PauseResponse(p peer.ID, requestID graphsync.RequestID) error {
169+
func (rm *ResponseManager) PauseResponse(ctx context.Context, requestID graphsync.RequestID) error {
174170
response := make(chan error, 1)
175-
rm.send(&pauseRequestMessage{p, requestID, response}, nil)
171+
rm.send(&pauseRequestMessage{requestID, response}, ctx.Done())
176172
select {
177173
case <-rm.ctx.Done():
178174
return errors.New("context cancelled")
@@ -182,9 +178,9 @@ func (rm *ResponseManager) PauseResponse(p peer.ID, requestID graphsync.RequestI
182178
}
183179

184180
// CancelResponse cancels an in progress response
185-
func (rm *ResponseManager) CancelResponse(p peer.ID, requestID graphsync.RequestID) error {
181+
func (rm *ResponseManager) CancelResponse(ctx context.Context, requestID graphsync.RequestID) error {
186182
response := make(chan error, 1)
187-
rm.send(&errorRequestMessage{p, requestID, queryexecutor.ErrCancelledByCommand, response}, nil)
183+
rm.send(&errorRequestMessage{requestID, queryexecutor.ErrCancelledByCommand, response}, ctx.Done())
188184
select {
189185
case <-rm.ctx.Done():
190186
return errors.New("context cancelled")
@@ -204,39 +200,39 @@ func (rm *ResponseManager) synchronize() {
204200
}
205201

206202
// StartTask starts the given task from the peer task queue
207-
func (rm *ResponseManager) StartTask(task *peertask.Task, responseTaskChan chan<- queryexecutor.ResponseTask) {
208-
rm.send(&startTaskRequest{task, responseTaskChan}, nil)
203+
func (rm *ResponseManager) StartTask(task *peertask.Task, p peer.ID, responseTaskChan chan<- queryexecutor.ResponseTask) {
204+
rm.send(&startTaskRequest{task, p, responseTaskChan}, nil)
209205
}
210206

211207
// GetUpdates is called to read pending updates for a task and clear them
212-
func (rm *ResponseManager) GetUpdates(p peer.ID, requestID graphsync.RequestID, updatesChan chan<- []gsmsg.GraphSyncRequest) {
213-
rm.send(&responseUpdateRequest{responseKey{p, requestID}, updatesChan}, nil)
208+
func (rm *ResponseManager) GetUpdates(requestID graphsync.RequestID, updatesChan chan<- []gsmsg.GraphSyncRequest) {
209+
rm.send(&responseUpdateRequest{requestID, updatesChan}, nil)
214210
}
215211

216212
// FinishTask marks a task from the task queue as done
217-
func (rm *ResponseManager) FinishTask(task *peertask.Task, err error) {
213+
func (rm *ResponseManager) FinishTask(task *peertask.Task, p peer.ID, err error) {
218214
done := make(chan struct{}, 1)
219-
rm.send(&finishTaskRequest{task, err, done}, nil)
215+
rm.send(&finishTaskRequest{task, p, err, done}, nil)
220216
select {
221217
case <-rm.ctx.Done():
222218
case <-done:
223219
}
224220
}
225221

226222
// CloseWithNetworkError closes a request due to a network error
227-
func (rm *ResponseManager) CloseWithNetworkError(p peer.ID, requestID graphsync.RequestID) {
223+
func (rm *ResponseManager) CloseWithNetworkError(requestID graphsync.RequestID) {
228224
done := make(chan error, 1)
229-
rm.send(&errorRequestMessage{p, requestID, queryexecutor.ErrNetworkError, done}, nil)
225+
rm.send(&errorRequestMessage{requestID, queryexecutor.ErrNetworkError, done}, nil)
230226
select {
231227
case <-rm.ctx.Done():
232228
case <-done:
233229
}
234230
}
235231

236232
// TerminateRequest indicates a request has finished sending data and should no longer be tracked
237-
func (rm *ResponseManager) TerminateRequest(p peer.ID, requestID graphsync.RequestID) {
233+
func (rm *ResponseManager) TerminateRequest(requestID graphsync.RequestID) {
238234
done := make(chan struct{}, 1)
239-
rm.send(&terminateRequestMessage{p, requestID, done}, nil)
235+
rm.send(&terminateRequestMessage{requestID, done}, nil)
240236
select {
241237
case <-rm.ctx.Done():
242238
case <-done:

0 commit comments

Comments
 (0)