Skip to content

Commit 51e2291

Browse files
committed
Handle context cancellation properly (ipfs#428)
* fix(cancellation): handle message cancellation properly a previous fix ipfs#391, attempted to address a lock that occurred on context cancel. however in doing so, it introduced a new lock. essentially, if a message was not sent to the requestmanager/responsemanager go routine, waiting for a response to that message could last indefinitely. the previous fix therefore stopped waiting when the calling context cancelled. However once a message reaches the go routine of the requestmanager/responsemanager, it's important that it's processed to completion, so that the the message loop doesn't lock. The proper fix is to detect when the message is sent successfully, and if so, wait for it to be processed. If it isn't sent, we can safely abort the go routine immediately. * fix(race): resolve race condition with test responses
1 parent 226a4d3 commit 51e2291

File tree

2 files changed

+68
-41
lines changed

2 files changed

+68
-41
lines changed

requestmanager/client.go

+34-20
Original file line numberDiff line numberDiff line change
@@ -186,13 +186,14 @@ func (rm *RequestManager) NewRequest(ctx context.Context,
186186

187187
inProgressRequestChan := make(chan inProgressRequest)
188188

189-
rm.send(&newRequestMessage{requestID, span, p, root, selectorNode, extensions, inProgressRequestChan}, ctx.Done())
189+
err := rm.send(&newRequestMessage{requestID, span, p, root, selectorNode, extensions, inProgressRequestChan}, ctx.Done())
190+
if err != nil {
191+
return rm.emptyResponse()
192+
}
190193
var receivedInProgressRequest inProgressRequest
191194
select {
192195
case <-rm.ctx.Done():
193196
return rm.emptyResponse()
194-
case <-ctx.Done():
195-
return rm.emptyResponse()
196197
case receivedInProgressRequest = <-inProgressRequestChan:
197198
}
198199

@@ -282,12 +283,13 @@ func (rm *RequestManager) cancelRequestAndClose(requestID graphsync.RequestID,
282283
// CancelRequest cancels the given request ID and waits for the request to terminate
283284
func (rm *RequestManager) CancelRequest(ctx context.Context, requestID graphsync.RequestID) error {
284285
terminated := make(chan error, 1)
285-
rm.send(&cancelRequestMessage{requestID, terminated, graphsync.RequestClientCancelledErr{}}, ctx.Done())
286+
err := rm.send(&cancelRequestMessage{requestID, terminated, graphsync.RequestClientCancelledErr{}}, ctx.Done())
287+
if err != nil {
288+
return err
289+
}
286290
select {
287291
case <-rm.ctx.Done():
288292
return errors.New("context cancelled")
289-
case <-ctx.Done():
290-
return errors.New("context cancelled")
291293
case err := <-terminated:
292294
return err
293295
}
@@ -299,19 +301,20 @@ func (rm *RequestManager) ProcessResponses(p peer.ID,
299301
responses []gsmsg.GraphSyncResponse,
300302
blks []blocks.Block) {
301303

302-
rm.send(&processResponsesMessage{p, responses, blks}, nil)
304+
_ = rm.send(&processResponsesMessage{p, responses, blks}, nil)
303305
}
304306

305307
// UnpauseRequest unpauses a request that was paused in a block hook based request ID
306308
// Can also send extensions with unpause
307309
func (rm *RequestManager) UnpauseRequest(ctx context.Context, requestID graphsync.RequestID, extensions ...graphsync.ExtensionData) error {
308310
response := make(chan error, 1)
309-
rm.send(&unpauseRequestMessage{requestID, extensions, response}, ctx.Done())
311+
err := rm.send(&unpauseRequestMessage{requestID, extensions, response}, ctx.Done())
312+
if err != nil {
313+
return err
314+
}
310315
select {
311316
case <-rm.ctx.Done():
312317
return errors.New("context cancelled")
313-
case <-ctx.Done():
314-
return errors.New("context cancelled")
315318
case err := <-response:
316319
return err
317320
}
@@ -320,12 +323,13 @@ func (rm *RequestManager) UnpauseRequest(ctx context.Context, requestID graphsyn
320323
// PauseRequest pauses an in progress request (may take 1 or more blocks to process)
321324
func (rm *RequestManager) PauseRequest(ctx context.Context, requestID graphsync.RequestID) error {
322325
response := make(chan error, 1)
323-
rm.send(&pauseRequestMessage{requestID, response}, ctx.Done())
326+
err := rm.send(&pauseRequestMessage{requestID, response}, ctx.Done())
327+
if err != nil {
328+
return err
329+
}
324330
select {
325331
case <-rm.ctx.Done():
326332
return errors.New("context cancelled")
327-
case <-ctx.Done():
328-
return errors.New("context cancelled")
329333
case err := <-response:
330334
return err
331335
}
@@ -334,26 +338,27 @@ func (rm *RequestManager) PauseRequest(ctx context.Context, requestID graphsync.
334338
// UpdateRequest updates an in progress request
335339
func (rm *RequestManager) UpdateRequest(ctx context.Context, requestID graphsync.RequestID, extensions ...graphsync.ExtensionData) error {
336340
response := make(chan error, 1)
337-
rm.send(&updateRequestMessage{requestID, extensions, response}, ctx.Done())
341+
err := rm.send(&updateRequestMessage{requestID, extensions, response}, ctx.Done())
342+
if err != nil {
343+
return err
344+
}
338345
select {
339346
case <-rm.ctx.Done():
340347
return errors.New("context cancelled")
341-
case <-ctx.Done():
342-
return errors.New("context cancelled")
343348
case err := <-response:
344349
return err
345350
}
346351
}
347352

348353
// GetRequestTask gets data for the given task in the request queue
349354
func (rm *RequestManager) GetRequestTask(p peer.ID, task *peertask.Task, requestExecutionChan chan executor.RequestTask) {
350-
rm.send(&getRequestTaskMessage{p, task, requestExecutionChan}, nil)
355+
_ = rm.send(&getRequestTaskMessage{p, task, requestExecutionChan}, nil)
351356
}
352357

353358
// ReleaseRequestTask releases a task request the requestQueue
354359
func (rm *RequestManager) ReleaseRequestTask(p peer.ID, task *peertask.Task, err error) {
355360
done := make(chan struct{}, 1)
356-
rm.send(&releaseRequestTaskMessage{p, task, err, done}, nil)
361+
_ = rm.send(&releaseRequestTaskMessage{p, task, err, done}, nil)
357362
select {
358363
case <-rm.ctx.Done():
359364
case <-done:
@@ -363,7 +368,7 @@ func (rm *RequestManager) ReleaseRequestTask(p peer.ID, task *peertask.Task, err
363368
// PeerState gets stats on all outgoing requests for a given peer
364369
func (rm *RequestManager) PeerState(p peer.ID) peerstate.PeerState {
365370
response := make(chan peerstate.PeerState)
366-
rm.send(&peerStateMessage{p, response}, nil)
371+
_ = rm.send(&peerStateMessage{p, response}, nil)
367372
select {
368373
case <-rm.ctx.Done():
369374
return peerstate.PeerState{}
@@ -391,11 +396,20 @@ func (rm *RequestManager) Shutdown() {
391396
rm.cancel()
392397
}
393398

394-
func (rm *RequestManager) send(message requestManagerMessage, done <-chan struct{}) {
399+
func (rm *RequestManager) send(message requestManagerMessage, done <-chan struct{}) error {
400+
// prioritize cancelled context
401+
select {
402+
case <-done:
403+
return errors.New("unable to send message before cancellation")
404+
default:
405+
}
395406
select {
396407
case <-rm.ctx.Done():
408+
return rm.ctx.Err()
397409
case <-done:
410+
return errors.New("unable to send message before cancellation")
398411
case rm.messages <- message:
412+
return nil
399413
}
400414
}
401415

responsemanager/client.go

+34-21
Original file line numberDiff line numberDiff line change
@@ -154,18 +154,19 @@ func New(ctx context.Context,
154154

155155
// ProcessRequests processes incoming requests for the given peer
156156
func (rm *ResponseManager) ProcessRequests(ctx context.Context, p peer.ID, requests []gsmsg.GraphSyncRequest) {
157-
rm.send(&processRequestsMessage{p, requests}, ctx.Done())
157+
_ = rm.send(&processRequestsMessage{p, requests}, ctx.Done())
158158
}
159159

160160
// UnpauseResponse unpauses a response that was previously paused
161161
func (rm *ResponseManager) UnpauseResponse(ctx context.Context, requestID graphsync.RequestID, extensions ...graphsync.ExtensionData) error {
162162
response := make(chan error, 1)
163-
rm.send(&unpauseRequestMessage{requestID, response, extensions}, ctx.Done())
163+
err := rm.send(&unpauseRequestMessage{requestID, response, extensions}, ctx.Done())
164+
if err != nil {
165+
return err
166+
}
164167
select {
165168
case <-rm.ctx.Done():
166169
return errors.New("context cancelled")
167-
case <-ctx.Done():
168-
return errors.New("context cancelled")
169170
case err := <-response:
170171
return err
171172
}
@@ -174,12 +175,13 @@ func (rm *ResponseManager) UnpauseResponse(ctx context.Context, requestID graphs
174175
// PauseResponse pauses an in progress response (may take 1 or more blocks to process)
175176
func (rm *ResponseManager) PauseResponse(ctx context.Context, requestID graphsync.RequestID) error {
176177
response := make(chan error, 1)
177-
rm.send(&pauseRequestMessage{requestID, response}, ctx.Done())
178+
err := rm.send(&pauseRequestMessage{requestID, response}, ctx.Done())
179+
if err != nil {
180+
return err
181+
}
178182
select {
179183
case <-rm.ctx.Done():
180184
return errors.New("context cancelled")
181-
case <-ctx.Done():
182-
return errors.New("context cancelled")
183185
case err := <-response:
184186
return err
185187
}
@@ -188,12 +190,13 @@ func (rm *ResponseManager) PauseResponse(ctx context.Context, requestID graphsyn
188190
// CancelResponse cancels an in progress response
189191
func (rm *ResponseManager) CancelResponse(ctx context.Context, requestID graphsync.RequestID) error {
190192
response := make(chan error, 1)
191-
rm.send(&errorRequestMessage{requestID, queryexecutor.ErrCancelledByCommand, response}, ctx.Done())
193+
err := rm.send(&errorRequestMessage{requestID, queryexecutor.ErrCancelledByCommand, response}, ctx.Done())
194+
if err != nil {
195+
return err
196+
}
192197
select {
193198
case <-rm.ctx.Done():
194199
return errors.New("context cancelled")
195-
case <-ctx.Done():
196-
return errors.New("context cancelled")
197200
case err := <-response:
198201
return err
199202
}
@@ -202,12 +205,13 @@ func (rm *ResponseManager) CancelResponse(ctx context.Context, requestID graphsy
202205
// UpdateRequest updates an in progress response
203206
func (rm *ResponseManager) UpdateResponse(ctx context.Context, requestID graphsync.RequestID, extensions ...graphsync.ExtensionData) error {
204207
response := make(chan error, 1)
205-
rm.send(&updateRequestMessage{requestID, extensions, response}, ctx.Done())
208+
err := rm.send(&updateRequestMessage{requestID, extensions, response}, ctx.Done())
209+
if err != nil {
210+
return err
211+
}
206212
select {
207213
case <-rm.ctx.Done():
208214
return errors.New("context cancelled")
209-
case <-ctx.Done():
210-
return errors.New("context cancelled")
211215
case err := <-response:
212216
return err
213217
}
@@ -216,7 +220,7 @@ func (rm *ResponseManager) UpdateResponse(ctx context.Context, requestID graphsy
216220
// Synchronize is a utility method that blocks until all current messages are processed
217221
func (rm *ResponseManager) synchronize() {
218222
sync := make(chan error)
219-
rm.send(&synchronizeMessage{sync}, nil)
223+
_ = rm.send(&synchronizeMessage{sync}, nil)
220224
select {
221225
case <-rm.ctx.Done():
222226
case <-sync:
@@ -225,18 +229,18 @@ func (rm *ResponseManager) synchronize() {
225229

226230
// StartTask starts the given task from the peer task queue
227231
func (rm *ResponseManager) StartTask(task *peertask.Task, p peer.ID, responseTaskChan chan<- queryexecutor.ResponseTask) {
228-
rm.send(&startTaskRequest{task, p, responseTaskChan}, nil)
232+
_ = rm.send(&startTaskRequest{task, p, responseTaskChan}, nil)
229233
}
230234

231235
// GetUpdates is called to read pending updates for a task and clear them
232236
func (rm *ResponseManager) GetUpdates(requestID graphsync.RequestID, updatesChan chan<- []gsmsg.GraphSyncRequest) {
233-
rm.send(&responseUpdateRequest{requestID, updatesChan}, nil)
237+
_ = rm.send(&responseUpdateRequest{requestID, updatesChan}, nil)
234238
}
235239

236240
// FinishTask marks a task from the task queue as done
237241
func (rm *ResponseManager) FinishTask(task *peertask.Task, p peer.ID, err error) {
238242
done := make(chan struct{}, 1)
239-
rm.send(&finishTaskRequest{task, p, err, done}, nil)
243+
_ = rm.send(&finishTaskRequest{task, p, err, done}, nil)
240244
select {
241245
case <-rm.ctx.Done():
242246
case <-done:
@@ -246,7 +250,7 @@ func (rm *ResponseManager) FinishTask(task *peertask.Task, p peer.ID, err error)
246250
// CloseWithNetworkError closes a request due to a network error
247251
func (rm *ResponseManager) CloseWithNetworkError(requestID graphsync.RequestID) {
248252
done := make(chan error, 1)
249-
rm.send(&errorRequestMessage{requestID, queryexecutor.ErrNetworkError, done}, nil)
253+
_ = rm.send(&errorRequestMessage{requestID, queryexecutor.ErrNetworkError, done}, nil)
250254
select {
251255
case <-rm.ctx.Done():
252256
case <-done:
@@ -256,7 +260,7 @@ func (rm *ResponseManager) CloseWithNetworkError(requestID graphsync.RequestID)
256260
// TerminateRequest indicates a request has finished sending data and should no longer be tracked
257261
func (rm *ResponseManager) TerminateRequest(requestID graphsync.RequestID) {
258262
done := make(chan struct{}, 1)
259-
rm.send(&terminateRequestMessage{requestID, done}, nil)
263+
_ = rm.send(&terminateRequestMessage{requestID, done}, nil)
260264
select {
261265
case <-rm.ctx.Done():
262266
case <-done:
@@ -266,7 +270,7 @@ func (rm *ResponseManager) TerminateRequest(requestID graphsync.RequestID) {
266270
// PeerState gets current state of the outgoing responses for a given peer
267271
func (rm *ResponseManager) PeerState(p peer.ID) peerstate.PeerState {
268272
response := make(chan peerstate.PeerState)
269-
rm.send(&peerStateMessage{p, response}, nil)
273+
_ = rm.send(&peerStateMessage{p, response}, nil)
270274
select {
271275
case <-rm.ctx.Done():
272276
return peerstate.PeerState{}
@@ -275,11 +279,20 @@ func (rm *ResponseManager) PeerState(p peer.ID) peerstate.PeerState {
275279
}
276280
}
277281

278-
func (rm *ResponseManager) send(message responseManagerMessage, done <-chan struct{}) {
282+
func (rm *ResponseManager) send(message responseManagerMessage, done <-chan struct{}) error {
283+
// prioritize cancelled context
284+
select {
285+
case <-done:
286+
return errors.New("unable to send message before cancellation")
287+
default:
288+
}
279289
select {
280290
case <-rm.ctx.Done():
291+
return rm.ctx.Err()
281292
case <-done:
293+
return errors.New("unable to send message before cancellation")
282294
case rm.messages <- message:
295+
return nil
283296
}
284297
}
285298

0 commit comments

Comments
 (0)