Skip to content

Commit fc4fc63

Browse files
committed
fix(requestmanager): make termination predictable
termination from remote peers was not properly handling blocks included up to termination, and could include unpredictable amounts in the response channel
1 parent 9776ae5 commit fc4fc63

File tree

3 files changed

+18
-4
lines changed

3 files changed

+18
-4
lines changed

requestmanager/client.go

+1
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ type inProgressRequestStatus struct {
6666
inProgressChan chan graphsync.ResponseProgress
6767
inProgressErr chan error
6868
traverser ipldutil.Traverser
69+
traverserCancel context.CancelFunc
6970
}
7071

7172
// PeerHandler is an interface that can send requests to peers

requestmanager/server.go

+12-2
Original file line numberDiff line numberDiff line change
@@ -109,12 +109,17 @@ func (rm *RequestManager) requestTask(requestID graphsync.RequestID) executor.Re
109109
LinkBudget: int64(rm.maxLinksPerRequest),
110110
}
111111
}
112+
// the traverser has its own context because we want to fail on block boundaries, in the executor,
113+
// and make sure all blocks included up to the termination message
114+
// are processed and passed in the response channel
115+
ctx, cancel := context.WithCancel(rm.ctx)
116+
ipr.traverserCancel = cancel
112117
ipr.traverser = ipldutil.TraversalBuilder{
113118
Root: cidlink.Link{Cid: ipr.request.Root()},
114119
Selector: ipr.request.Selector(),
115120
Visitor: func(tp traversal.Progress, node ipld.Node, tr traversal.VisitReason) error {
116121
select {
117-
case <-ipr.ctx.Done():
122+
case <-ctx.Done():
118123
case ipr.inProgressChan <- graphsync.ResponseProgress{
119124
Node: node,
120125
Path: tp.Path,
@@ -126,7 +131,7 @@ func (rm *RequestManager) requestTask(requestID graphsync.RequestID) executor.Re
126131
Chooser: ipr.nodeStyleChooser,
127132
LinkSystem: rm.linkSystem,
128133
Budget: budget,
129-
}.Start(ipr.ctx)
134+
}.Start(ctx)
130135
}
131136

132137
ipr.state = running
@@ -165,6 +170,7 @@ func (rm *RequestManager) terminateRequest(requestID graphsync.RequestID, ipr *i
165170
ipr.cancelFn()
166171
rm.asyncLoader.CleanupRequest(requestID)
167172
if ipr.traverser != nil {
173+
ipr.traverserCancel()
168174
ipr.traverser.Shutdown(rm.ctx)
169175
}
170176
// make sure context is not closed before closing channels (could cause send
@@ -236,6 +242,10 @@ func (rm *RequestManager) processResponseMessage(p peer.ID, responses []gsmsg.Gr
236242
filteredResponses = rm.filterResponsesForPeer(filteredResponses, p)
237243
rm.updateLastResponses(filteredResponses)
238244
responseMetadata := metadataForResponses(filteredResponses)
245+
fmt.Println(responseMetadata)
246+
if len(filteredResponses) > 0 {
247+
fmt.Println(filteredResponses[0].Status())
248+
}
239249
rm.asyncLoader.ProcessResponse(responseMetadata, blks)
240250
rm.processTerminations(filteredResponses)
241251
log.Debugf("end processing message for peer %s", p)

testutil/testutil.go

+5-2
Original file line numberDiff line numberDiff line change
@@ -152,10 +152,13 @@ func ReadNResponses(ctx context.Context, t TestingT, responseChan <-chan graphsy
152152
var returnedBlocks []graphsync.ResponseProgress
153153
for i := 0; i < count; i++ {
154154
select {
155-
case blk := <-responseChan:
155+
case blk, ok := <-responseChan:
156+
if !ok {
157+
require.FailNowf(t, "Channel closed early", "expected %d messages, got %d", count, len(returnedBlocks))
158+
}
156159
returnedBlocks = append(returnedBlocks, blk)
157160
case <-ctx.Done():
158-
t.Fatal("Unable to read enough responses")
161+
require.FailNow(t, "Unable to read enough responses")
159162
}
160163
}
161164
return returnedBlocks

0 commit comments

Comments
 (0)