Skip to content

Commit 3740e98

Browse files
committed
feat(responsemanager): clarify response completion
only delete requests when they finish going over the network. put requests that are not processing but still going over the network in a state of CompletingSend
1 parent f08c2ed commit 3740e98

File tree

9 files changed

+102
-93
lines changed

9 files changed

+102
-93
lines changed

Diff for: graphsync.go

+5
Original file line numberDiff line numberDiff line change
@@ -345,6 +345,9 @@ const (
345345
Running
346346
// Paused means a request is paused
347347
Paused
348+
// CompletingSend means we have processed a query and are waiting for data to
349+
// go over the network
350+
CompletingSend
348351
)
349352

350353
func (rs RequestState) String() string {
@@ -355,6 +358,8 @@ func (rs RequestState) String() string {
355358
return "running"
356359
case Paused:
357360
return "paused"
361+
case CompletingSend:
362+
return "completing send"
358363
default:
359364
return "unrecognized request state"
360365
}

Diff for: impl/graphsync.go

-2
Original file line numberDiff line numberDiff line change
@@ -262,9 +262,7 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
262262
responseManager,
263263
outgoingBlockHooks,
264264
requestUpdatedHooks,
265-
requestorCancelledListeners,
266265
responseAssembler,
267-
network.ConnectionManager(),
268266
)
269267
graphSync := &GraphSync{
270268
network: network,

Diff for: responsemanager/client.go

+10
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,16 @@ func (rm *ResponseManager) CloseWithNetworkError(p peer.ID, requestID graphsync.
232232
rm.send(&errorRequestMessage{p, requestID, queryexecutor.ErrNetworkError, make(chan error, 1)}, nil)
233233
}
234234

235+
// TerminateRequest indicates a request has finished sending data and should no longer be tracked
236+
func (rm *ResponseManager) TerminateRequest(p peer.ID, requestID graphsync.RequestID) {
237+
done := make(chan struct{}, 1)
238+
rm.send(&terminateRequestMessage{p, requestID, done}, nil)
239+
select {
240+
case <-rm.ctx.Done():
241+
case <-done:
242+
}
243+
}
244+
235245
// PeerState gets current state of the outgoing responses for a given peer
236246
func (rm *ResponseManager) PeerState(p peer.ID) peerstate.PeerState {
237247
response := make(chan peerstate.PeerState)

Diff for: responsemanager/messages.go

+14
Original file line numberDiff line numberDiff line change
@@ -127,3 +127,17 @@ func (psm *peerStateMessage) handle(rm *ResponseManager) {
127127
case <-rm.ctx.Done():
128128
}
129129
}
130+
131+
type terminateRequestMessage struct {
132+
p peer.ID
133+
requestID graphsync.RequestID
134+
done chan<- struct{}
135+
}
136+
137+
func (trm *terminateRequestMessage) handle(rm *ResponseManager) {
138+
rm.terminateRequest(responseKey{trm.p, trm.requestID})
139+
select {
140+
case <-rm.ctx.Done():
141+
case trm.done <- struct{}{}:
142+
}
143+
}

Diff for: responsemanager/queryexecutor/queryexecutor.go

+10-27
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ import (
1414
"github.com/ipfs/go-graphsync"
1515
"github.com/ipfs/go-graphsync/ipldutil"
1616
gsmsg "github.com/ipfs/go-graphsync/message"
17-
"github.com/ipfs/go-graphsync/network"
1817
"github.com/ipfs/go-graphsync/notifications"
1918
"github.com/ipfs/go-graphsync/responsemanager/hooks"
2019
"github.com/ipfs/go-graphsync/responsemanager/responseassembler"
@@ -54,32 +53,26 @@ type ResponseSignals struct {
5453

5554
// QueryExecutor is responsible for performing individual requests by executing their traversals
5655
type QueryExecutor struct {
57-
ctx context.Context
58-
manager Manager
59-
blockHooks BlockHooks
60-
updateHooks UpdateHooks
61-
cancelledListeners CancelledListeners
62-
responseAssembler ResponseAssembler
63-
connManager network.ConnManager
56+
ctx context.Context
57+
manager Manager
58+
blockHooks BlockHooks
59+
updateHooks UpdateHooks
60+
responseAssembler ResponseAssembler
6461
}
6562

6663
// New creates a new QueryExecutor
6764
func New(ctx context.Context,
6865
manager Manager,
6966
blockHooks BlockHooks,
7067
updateHooks UpdateHooks,
71-
cancelledListeners CancelledListeners,
7268
responseAssembler ResponseAssembler,
73-
connManager network.ConnManager,
7469
) *QueryExecutor {
7570
qm := &QueryExecutor{
76-
blockHooks: blockHooks,
77-
updateHooks: updateHooks,
78-
cancelledListeners: cancelledListeners,
79-
responseAssembler: responseAssembler,
80-
manager: manager,
81-
ctx: ctx,
82-
connManager: connManager,
71+
blockHooks: blockHooks,
72+
updateHooks: updateHooks,
73+
responseAssembler: responseAssembler,
74+
manager: manager,
75+
ctx: ctx,
8376
}
8477
return qm
8578
}
@@ -106,11 +99,6 @@ func (qe *QueryExecutor) ExecuteTask(ctx context.Context, pid peer.ID, task *pee
10699

107100
log.Debugw("beginning response execution", "id", rt.Request.ID(), "peer", pid.String(), "root_cid", rt.Request.Root().String())
108101
err := qe.executeQuery(pid, rt)
109-
isCancelled := err != nil && ipldutil.IsContextCancelErr(err)
110-
if isCancelled {
111-
qe.connManager.Unprotect(pid, rt.Request.ID().Tag())
112-
qe.cancelledListeners.NotifyCancelledListeners(pid, rt.Request)
113-
}
114102
qe.manager.FinishTask(task, err)
115103
log.Debugw("finishing response execution", "id", rt.Request.ID(), "peer", pid.String(), "root_cid", rt.Request.Root().String())
116104
return false
@@ -286,11 +274,6 @@ type UpdateHooks interface {
286274
ProcessUpdateHooks(p peer.ID, request graphsync.RequestData, update graphsync.RequestData) hooks.UpdateResult
287275
}
288276

289-
// CancelledListeners is an interface for notifying listeners that requestor cancelled
290-
type CancelledListeners interface {
291-
NotifyCancelledListeners(p peer.ID, request graphsync.RequestData)
292-
}
293-
294277
// ResponseAssembler is an interface that returns sender interfaces for peer responses.
295278
type ResponseAssembler interface {
296279
Transaction(p peer.ID, requestID graphsync.RequestID, transaction responseassembler.Transaction) error

Diff for: responsemanager/queryexecutor/queryexecutor_test.go

+25-47
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import (
2323

2424
"github.com/ipfs/go-graphsync"
2525
"github.com/ipfs/go-graphsync/ipldutil"
26-
"github.com/ipfs/go-graphsync/listeners"
2726
gsmsg "github.com/ipfs/go-graphsync/message"
2827
"github.com/ipfs/go-graphsync/notifications"
2928
"github.com/ipfs/go-graphsync/responsemanager/hooks"
@@ -44,7 +43,6 @@ func TestOneBlockTask(t *testing.T) {
4443
notifeeExpect(t, td, 1, td.responseCode)
4544
require.Equal(t, false, qe.ExecuteTask(td.ctx, td.peer, td.task))
4645
require.Equal(t, 0, td.clearRequestCalls)
47-
require.Equal(t, 0, td.cancelledCalls)
4846
}
4947

5048
func TestSmallGraphTask(t *testing.T) {
@@ -83,7 +81,6 @@ func TestSmallGraphTask(t *testing.T) {
8381
notifeeExpect(t, td, 10, td.responseCode) // AddNotifee called on all blocks
8482
require.Equal(t, false, qe.ExecuteTask(td.ctx, td.peer, td.task))
8583
require.Equal(t, 0, td.clearRequestCalls)
86-
require.Equal(t, 0, td.cancelledCalls)
8784
})
8885

8986
t.Run("paused by hook", func(t *testing.T) {
@@ -98,7 +95,6 @@ func TestSmallGraphTask(t *testing.T) {
9895
require.Equal(t, false, qe.ExecuteTask(td.ctx, td.peer, td.task))
9996
require.Equal(t, 1, td.pauseCalls)
10097
require.Equal(t, 0, td.clearRequestCalls)
101-
require.Equal(t, 0, td.cancelledCalls)
10298
})
10399

104100
t.Run("paused by signal", func(t *testing.T) {
@@ -117,7 +113,6 @@ func TestSmallGraphTask(t *testing.T) {
117113
require.Equal(t, false, qe.ExecuteTask(td.ctx, td.peer, td.task))
118114
require.Equal(t, 1, td.pauseCalls)
119115
require.Equal(t, 0, td.clearRequestCalls)
120-
require.Equal(t, 0, td.cancelledCalls)
121116
})
122117

123118
t.Run("partial cancelled by hook", func(t *testing.T) {
@@ -130,7 +125,6 @@ func TestSmallGraphTask(t *testing.T) {
130125
transactionExpect(t, td, []int{6, 7}, ipldutil.ContextCancelError{}.Error()) // last 2 transactions are ContextCancelled
131126

132127
require.Equal(t, false, qe.ExecuteTask(td.ctx, td.peer, td.task))
133-
require.Equal(t, 1, td.cancelledCalls)
134128
require.Equal(t, 1, td.clearRequestCalls)
135129
})
136130

@@ -153,7 +147,6 @@ func TestSmallGraphTask(t *testing.T) {
153147
require.Equal(t, 0, td.clearRequestCalls)
154148
// cancelled by signal doesn't mean we get a cancelled call here
155149
// ErrCancelledByCommand is treated differently to a context cancellation error
156-
require.Equal(t, 0, td.cancelledCalls)
157150
})
158151

159152
t.Run("unknown error by hook", func(t *testing.T) {
@@ -168,7 +161,6 @@ func TestSmallGraphTask(t *testing.T) {
168161

169162
require.Equal(t, false, qe.ExecuteTask(td.ctx, td.peer, td.task))
170163
require.Equal(t, 0, td.clearRequestCalls)
171-
require.Equal(t, 0, td.cancelledCalls)
172164
})
173165

174166
t.Run("unknown error by signal", func(t *testing.T) {
@@ -189,7 +181,6 @@ func TestSmallGraphTask(t *testing.T) {
189181

190182
require.Equal(t, false, qe.ExecuteTask(td.ctx, td.peer, td.task))
191183
require.Equal(t, 0, td.clearRequestCalls)
192-
require.Equal(t, 0, td.cancelledCalls)
193184
})
194185

195186
t.Run("network error by hook", func(t *testing.T) {
@@ -204,7 +195,6 @@ func TestSmallGraphTask(t *testing.T) {
204195

205196
require.Equal(t, false, qe.ExecuteTask(td.ctx, td.peer, td.task))
206197
require.Equal(t, 1, td.clearRequestCalls)
207-
require.Equal(t, 0, td.cancelledCalls)
208198
})
209199

210200
t.Run("network error by signal", func(t *testing.T) {
@@ -225,7 +215,6 @@ func TestSmallGraphTask(t *testing.T) {
225215

226216
require.Equal(t, false, qe.ExecuteTask(td.ctx, td.peer, td.task))
227217
require.Equal(t, 1, td.clearRequestCalls)
228-
require.Equal(t, 0, td.cancelledCalls)
229218
})
230219

231220
t.Run("first block wont load", func(t *testing.T) {
@@ -238,7 +227,6 @@ func TestSmallGraphTask(t *testing.T) {
238227

239228
require.Equal(t, false, qe.ExecuteTask(td.ctx, td.peer, td.task))
240229
require.Equal(t, 0, td.clearRequestCalls)
241-
require.Equal(t, 0, td.cancelledCalls)
242230
})
243231
}
244232

@@ -277,34 +265,31 @@ func newRandomBlock(index int64) *blockData {
277265
}
278266

279267
type testData struct {
280-
ctx context.Context
281-
t *testing.T
282-
cancel func()
283-
task *peertask.Task
284-
blockStore map[ipld.Link][]byte
285-
persistence ipld.LinkSystem
286-
manager *fauxManager
287-
responseAssembler *fauxResponseAssembler
288-
responseBuilder *fauxResponseBuilder
289-
connManager *testutil.TestConnManager
290-
blockHooks *hooks.OutgoingBlockHooks
291-
updateHooks *hooks.RequestUpdatedHooks
292-
cancelledListeners *listeners.RequestorCancelledListeners
293-
extensionData []byte
294-
extensionName graphsync.ExtensionName
295-
extension graphsync.ExtensionData
296-
requestID graphsync.RequestID
297-
requestCid cid.Cid
298-
requestSelector datamodel.Node
299-
requests []gsmsg.GraphSyncRequest
300-
signals *ResponseSignals
301-
pauseCalls int
302-
clearRequestCalls int
303-
cancelledCalls int
304-
expectedBlocks []*blockData
305-
responseCode graphsync.ResponseStatusCode
306-
peer peer.ID
307-
subscriber *notifications.TopicDataSubscriber
268+
ctx context.Context
269+
t *testing.T
270+
cancel func()
271+
task *peertask.Task
272+
blockStore map[ipld.Link][]byte
273+
persistence ipld.LinkSystem
274+
manager *fauxManager
275+
responseAssembler *fauxResponseAssembler
276+
responseBuilder *fauxResponseBuilder
277+
blockHooks *hooks.OutgoingBlockHooks
278+
updateHooks *hooks.RequestUpdatedHooks
279+
extensionData []byte
280+
extensionName graphsync.ExtensionName
281+
extension graphsync.ExtensionData
282+
requestID graphsync.RequestID
283+
requestCid cid.Cid
284+
requestSelector datamodel.Node
285+
requests []gsmsg.GraphSyncRequest
286+
signals *ResponseSignals
287+
pauseCalls int
288+
clearRequestCalls int
289+
expectedBlocks []*blockData
290+
responseCode graphsync.ResponseStatusCode
291+
peer peer.ID
292+
subscriber *notifications.TopicDataSubscriber
308293
}
309294

310295
func newTestData(t *testing.T, blockCount int, expectedTraverse int) (*testData, *QueryExecutor) {
@@ -318,10 +303,8 @@ func newTestData(t *testing.T, blockCount int, expectedTraverse int) (*testData,
318303
td.task = &peertask.Task{}
319304
td.manager = &fauxManager{ctx: ctx, t: t, expectedStartTask: td.task}
320305
td.responseAssembler = &fauxResponseAssembler{}
321-
td.connManager = testutil.NewTestConnManager()
322306
td.blockHooks = hooks.NewBlockHooks()
323307
td.updateHooks = hooks.NewUpdateHooks()
324-
td.cancelledListeners = listeners.NewRequestorCancelledListeners()
325308
td.requestID = graphsync.RequestID(rand.Int31())
326309
td.requestCid, _ = cid.Decode("bafybeigdyrzt5sfp7udm7hu76uh7y26nf3efuylqabf3oclgtqy55fbzdi")
327310
td.requestSelector = basicnode.NewInt(rand.Int63())
@@ -401,18 +384,13 @@ func newTestData(t *testing.T, blockCount int, expectedTraverse int) (*testData,
401384
td.responseAssembler.responseBuilder.pauseCb = func() {
402385
td.pauseCalls++
403386
}
404-
td.cancelledListeners.Register(func(p peer.ID, request graphsync.RequestData) {
405-
td.cancelledCalls++
406-
})
407387

408388
qe := New(
409389
td.ctx,
410390
td.manager,
411391
td.blockHooks,
412392
td.updateHooks,
413-
td.cancelledListeners,
414393
td.responseAssembler,
415-
td.connManager,
416394
)
417395
return td, qe
418396
}

Diff for: responsemanager/responsemanager_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,6 @@ func TestCancellationQueryInProgress(t *testing.T) {
8787
})
8888
cancelledListenerCalled := make(chan struct{}, 1)
8989
td.cancelledListeners.Register(func(p peer.ID, request graphsync.RequestData) {
90-
td.connManager.RefuteProtected(t, td.p)
9190
cancelledListenerCalled <- struct{}{}
9291
})
9392
responseManager.Startup()
@@ -105,6 +104,7 @@ func TestCancellationQueryInProgress(t *testing.T) {
105104
close(waitForCancel)
106105

107106
testutil.AssertDoesReceive(td.ctx, t, cancelledListenerCalled, "should call cancelled listener")
107+
td.connManager.RefuteProtected(t, td.p)
108108

109109
td.assertRequestCleared()
110110
}
@@ -1138,7 +1138,7 @@ func (td *testData) alternateLoaderResponseManager() *ResponseManager {
11381138
}
11391139

11401140
func (td *testData) newQueryExecutor(manager queryexecutor.Manager) *queryexecutor.QueryExecutor {
1141-
return queryexecutor.New(td.ctx, manager, td.blockHooks, td.updateHooks, td.cancelledListeners, td.responseAssembler, td.connManager)
1141+
return queryexecutor.New(td.ctx, manager, td.blockHooks, td.updateHooks, td.responseAssembler)
11421142
}
11431143

11441144
func (td *testData) assertPausedRequest() {

0 commit comments

Comments
 (0)