Skip to content

Commit 4fb8a01

Browse files
committed
fixup! feat(graphsync): unify req & resp Pause, Unpause & Cancel by RequestID
1 parent 803fbbe commit 4fb8a01

File tree

3 files changed

+24
-15
lines changed

3 files changed

+24
-15
lines changed

requestmanager/server.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,7 @@ func (rm *RequestManager) cancelRequest(requestID graphsync.RequestID, onTermina
233233
if !ok {
234234
if onTerminated != nil {
235235
select {
236-
case onTerminated <- graphsync.RequestNotFoundErr{}:
236+
case onTerminated <- &graphsync.RequestNotFoundErr{}:
237237
case <-rm.ctx.Done():
238238
}
239239
}
@@ -380,7 +380,7 @@ func (rm *RequestManager) validateRequest(requestID graphsync.RequestID, p peer.
380380
func (rm *RequestManager) unpause(id graphsync.RequestID, extensions []graphsync.ExtensionData) error {
381381
inProgressRequestStatus, ok := rm.inProgressRequestStatuses[id]
382382
if !ok {
383-
return graphsync.RequestNotFoundErr{}
383+
return &graphsync.RequestNotFoundErr{}
384384
}
385385
if inProgressRequestStatus.state != graphsync.Paused {
386386
return errors.New("request is not paused")
@@ -394,7 +394,7 @@ func (rm *RequestManager) unpause(id graphsync.RequestID, extensions []graphsync
394394
func (rm *RequestManager) pause(id graphsync.RequestID) error {
395395
inProgressRequestStatus, ok := rm.inProgressRequestStatuses[id]
396396
if !ok {
397-
return graphsync.RequestNotFoundErr{}
397+
return &graphsync.RequestNotFoundErr{}
398398
}
399399
if inProgressRequestStatus.state == graphsync.Paused {
400400
return errors.New("request is already paused")

responsemanager/responsemanager_test.go

+18-7
Original file line numberDiff line numberDiff line change
@@ -218,22 +218,33 @@ func TestStats(t *testing.T) {
218218
responseManager := td.nullTaskQueueResponseManager()
219219
td.requestHooks.Register(selectorvalidator.SelectorValidator(100))
220220
responseManager.Startup()
221-
responseManager.ProcessRequests(td.ctx, td.p, td.requests)
221+
222+
p1 := td.p
223+
reqid1 := td.requestID
224+
req1 := td.requests
225+
222226
p2 := testutil.GeneratePeers(1)[0]
223-
responseManager.ProcessRequests(td.ctx, p2, td.requests)
224-
peerState := responseManager.PeerState(td.p)
227+
reqid2 := graphsync.NewRequestID()
228+
req2 := []gsmsg.GraphSyncRequest{
229+
gsmsg.NewRequest(reqid2, td.blockChain.TipLink.(cidlink.Link).Cid, td.blockChain.Selector(), graphsync.Priority(0), td.extension),
230+
}
231+
232+
responseManager.ProcessRequests(td.ctx, p1, req1)
233+
responseManager.ProcessRequests(td.ctx, p2, req2)
234+
235+
peerState := responseManager.PeerState(p1)
225236
require.Len(t, peerState.RequestStates, 1)
226-
require.Equal(t, peerState.RequestStates[td.requestID], graphsync.Queued)
237+
require.Equal(t, peerState.RequestStates[reqid1], graphsync.Queued)
227238
require.Len(t, peerState.Pending, 1)
228-
require.Equal(t, peerState.Pending[0], td.requestID)
239+
require.Equal(t, peerState.Pending[0], reqid1)
229240
require.Len(t, peerState.Active, 0)
230241
// no inconsistencies
231242
require.Len(t, peerState.Diagnostics(), 0)
232243
peerState = responseManager.PeerState(p2)
233244
require.Len(t, peerState.RequestStates, 1)
234-
require.Equal(t, peerState.RequestStates[td.requestID], graphsync.Queued)
245+
require.Equal(t, peerState.RequestStates[reqid2], graphsync.Queued)
235246
require.Len(t, peerState.Pending, 1)
236-
require.Equal(t, peerState.Pending[0], td.requestID)
247+
require.Equal(t, peerState.Pending[0], reqid2)
237248
require.Len(t, peerState.Active, 0)
238249
// no inconsistencies
239250
require.Len(t, peerState.Diagnostics(), 0)

responsemanager/server.go

+3-5
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package responsemanager
33
import (
44
"context"
55
"errors"
6-
"fmt"
76
"math"
87
"time"
98

@@ -124,7 +123,7 @@ func (rm *ResponseManager) processUpdate(ctx context.Context, requestID graphsyn
124123
func (rm *ResponseManager) unpauseRequest(requestID graphsync.RequestID, extensions ...graphsync.ExtensionData) error {
125124
inProgressResponse, ok := rm.inProgressResponses[requestID]
126125
if !ok {
127-
return graphsync.RequestNotFoundErr{}
126+
return &graphsync.RequestNotFoundErr{}
128127
}
129128
if inProgressResponse.state != graphsync.Paused {
130129
return errors.New("request is not paused")
@@ -148,7 +147,7 @@ func (rm *ResponseManager) abortRequest(ctx context.Context, requestID graphsync
148147
rm.responseQueue.Remove(queueTopic{response.peer, requestID}, response.peer)
149148
}
150149
if !ok || response.state == graphsync.CompletingSend {
151-
return graphsync.RequestNotFoundErr{}
150+
return &graphsync.RequestNotFoundErr{}
152151
}
153152

154153
_, span := otel.Tracer("graphsync").Start(trace.ContextWithSpan(ctx, response.span),
@@ -354,7 +353,7 @@ func (rm *ResponseManager) getUpdates(requestID graphsync.RequestID) []gsmsg.Gra
354353
func (rm *ResponseManager) pauseRequest(requestID graphsync.RequestID) error {
355354
inProgressResponse, ok := rm.inProgressResponses[requestID]
356355
if !ok || inProgressResponse.state == graphsync.CompletingSend {
357-
return graphsync.RequestNotFoundErr{}
356+
return &graphsync.RequestNotFoundErr{}
358357
}
359358
if inProgressResponse.state == graphsync.Paused {
360359
return errors.New("request is already paused")
@@ -371,7 +370,6 @@ func (rm *ResponseManager) peerState(p peer.ID) peerstate.PeerState {
371370
rm.responseQueue.WithPeerTopics(p, func(peerTopics *peertracker.PeerTrackerTopics) {
372371
requestStates := make(graphsync.RequestStates)
373372
for key, ipr := range rm.inProgressResponses {
374-
fmt.Printf("%v <> %v == %v\n", ipr.peer.Pretty(), p.Pretty(), ipr.peer == p)
375373
if ipr.peer == p {
376374
requestStates[key] = ipr.state
377375
}

0 commit comments

Comments
 (0)