Skip to content

Commit 0b12331

Browse files
committed
Handle context cancellation properly (ipfs#428)
* fix(cancellation): handle message cancellation properly a previous fix ipfs#391, attempted to address a lock that occurred on context cancel. however in doing so, it introduced a new lock. essentially, if a message was not sent to the requestmanager/responsemanager go routine, waiting for a response to that message could last indefinitely. the previous fix therefore stopped waiting when the calling context cancelled. However once a message reaches the go routine of the requestmanager/responsemanager, it's important that it's processed to completion, so that the the message loop doesn't lock. The proper fix is to detect when the message is sent successfully, and if so, wait for it to be processed. If it isn't sent, we can safely abort the go routine immediately. * fix(race): resolve race condition with test responses
1 parent 975c0f7 commit 0b12331

File tree

2 files changed

+68
-23
lines changed

2 files changed

+68
-23
lines changed

requestmanager/client.go

+34-10
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,10 @@ func (rm *RequestManager) NewRequest(ctx context.Context,
186186

187187
inProgressRequestChan := make(chan inProgressRequest)
188188

189-
rm.send(&newRequestMessage{requestID, span, p, root, selectorNode, extensions, inProgressRequestChan}, ctx.Done())
189+
err := rm.send(&newRequestMessage{requestID, span, p, root, selectorNode, extensions, inProgressRequestChan}, ctx.Done())
190+
if err != nil {
191+
return rm.emptyResponse()
192+
}
190193
var receivedInProgressRequest inProgressRequest
191194
select {
192195
case <-rm.ctx.Done():
@@ -280,7 +283,10 @@ func (rm *RequestManager) cancelRequestAndClose(requestID graphsync.RequestID,
280283
// CancelRequest cancels the given request ID and waits for the request to terminate
281284
func (rm *RequestManager) CancelRequest(ctx context.Context, requestID graphsync.RequestID) error {
282285
terminated := make(chan error, 1)
283-
rm.send(&cancelRequestMessage{requestID, terminated, graphsync.RequestClientCancelledErr{}}, ctx.Done())
286+
err := rm.send(&cancelRequestMessage{requestID, terminated, graphsync.RequestClientCancelledErr{}}, ctx.Done())
287+
if err != nil {
288+
return err
289+
}
284290
select {
285291
case <-rm.ctx.Done():
286292
return errors.New("context cancelled")
@@ -295,14 +301,17 @@ func (rm *RequestManager) ProcessResponses(p peer.ID,
295301
responses []gsmsg.GraphSyncResponse,
296302
blks []blocks.Block) {
297303

298-
rm.send(&processResponsesMessage{p, responses, blks}, nil)
304+
_ = rm.send(&processResponsesMessage{p, responses, blks}, nil)
299305
}
300306

301307
// UnpauseRequest unpauses a request that was paused in a block hook based request ID
302308
// Can also send extensions with unpause
303309
func (rm *RequestManager) UnpauseRequest(ctx context.Context, requestID graphsync.RequestID, extensions ...graphsync.ExtensionData) error {
304310
response := make(chan error, 1)
305-
rm.send(&unpauseRequestMessage{requestID, extensions, response}, ctx.Done())
311+
err := rm.send(&unpauseRequestMessage{requestID, extensions, response}, ctx.Done())
312+
if err != nil {
313+
return err
314+
}
306315
select {
307316
case <-rm.ctx.Done():
308317
return errors.New("context cancelled")
@@ -314,7 +323,10 @@ func (rm *RequestManager) UnpauseRequest(ctx context.Context, requestID graphsyn
314323
// PauseRequest pauses an in progress request (may take 1 or more blocks to process)
315324
func (rm *RequestManager) PauseRequest(ctx context.Context, requestID graphsync.RequestID) error {
316325
response := make(chan error, 1)
317-
rm.send(&pauseRequestMessage{requestID, response}, ctx.Done())
326+
err := rm.send(&pauseRequestMessage{requestID, response}, ctx.Done())
327+
if err != nil {
328+
return err
329+
}
318330
select {
319331
case <-rm.ctx.Done():
320332
return errors.New("context cancelled")
@@ -326,7 +338,10 @@ func (rm *RequestManager) PauseRequest(ctx context.Context, requestID graphsync.
326338
// UpdateRequest updates an in progress request
327339
func (rm *RequestManager) UpdateRequest(ctx context.Context, requestID graphsync.RequestID, extensions ...graphsync.ExtensionData) error {
328340
response := make(chan error, 1)
329-
rm.send(&updateRequestMessage{requestID, extensions, response}, ctx.Done())
341+
err := rm.send(&updateRequestMessage{requestID, extensions, response}, ctx.Done())
342+
if err != nil {
343+
return err
344+
}
330345
select {
331346
case <-rm.ctx.Done():
332347
return errors.New("context cancelled")
@@ -337,13 +352,13 @@ func (rm *RequestManager) UpdateRequest(ctx context.Context, requestID graphsync
337352

338353
// GetRequestTask gets data for the given task in the request queue
339354
func (rm *RequestManager) GetRequestTask(p peer.ID, task *peertask.Task, requestExecutionChan chan executor.RequestTask) {
340-
rm.send(&getRequestTaskMessage{p, task, requestExecutionChan}, nil)
355+
_ = rm.send(&getRequestTaskMessage{p, task, requestExecutionChan}, nil)
341356
}
342357

343358
// ReleaseRequestTask releases a task request the requestQueue
344359
func (rm *RequestManager) ReleaseRequestTask(p peer.ID, task *peertask.Task, err error) {
345360
done := make(chan struct{}, 1)
346-
rm.send(&releaseRequestTaskMessage{p, task, err, done}, nil)
361+
_ = rm.send(&releaseRequestTaskMessage{p, task, err, done}, nil)
347362
select {
348363
case <-rm.ctx.Done():
349364
case <-done:
@@ -353,7 +368,7 @@ func (rm *RequestManager) ReleaseRequestTask(p peer.ID, task *peertask.Task, err
353368
// PeerState gets stats on all outgoing requests for a given peer
354369
func (rm *RequestManager) PeerState(p peer.ID) peerstate.PeerState {
355370
response := make(chan peerstate.PeerState)
356-
rm.send(&peerStateMessage{p, response}, nil)
371+
_ = rm.send(&peerStateMessage{p, response}, nil)
357372
select {
358373
case <-rm.ctx.Done():
359374
return peerstate.PeerState{}
@@ -381,11 +396,20 @@ func (rm *RequestManager) Shutdown() {
381396
rm.cancel()
382397
}
383398

384-
func (rm *RequestManager) send(message requestManagerMessage, done <-chan struct{}) {
399+
func (rm *RequestManager) send(message requestManagerMessage, done <-chan struct{}) error {
400+
// prioritize cancelled context
401+
select {
402+
case <-done:
403+
return errors.New("unable to send message before cancellation")
404+
default:
405+
}
385406
select {
386407
case <-rm.ctx.Done():
408+
return rm.ctx.Err()
387409
case <-done:
410+
return errors.New("unable to send message before cancellation")
388411
case rm.messages <- message:
412+
return nil
389413
}
390414
}
391415

responsemanager/client.go

+34-13
Original file line numberDiff line numberDiff line change
@@ -156,13 +156,16 @@ func New(ctx context.Context,
156156

157157
// ProcessRequests processes incoming requests for the given peer
158158
func (rm *ResponseManager) ProcessRequests(ctx context.Context, p peer.ID, requests []gsmsg.GraphSyncRequest) {
159-
rm.send(&processRequestsMessage{p, requests}, ctx.Done())
159+
_ = rm.send(&processRequestsMessage{p, requests}, ctx.Done())
160160
}
161161

162162
// UnpauseResponse unpauses a response that was previously paused
163163
func (rm *ResponseManager) UnpauseResponse(ctx context.Context, requestID graphsync.RequestID, extensions ...graphsync.ExtensionData) error {
164164
response := make(chan error, 1)
165-
rm.send(&unpauseRequestMessage{requestID, response, extensions}, ctx.Done())
165+
err := rm.send(&unpauseRequestMessage{requestID, response, extensions}, ctx.Done())
166+
if err != nil {
167+
return err
168+
}
166169
select {
167170
case <-rm.ctx.Done():
168171
return errors.New("context cancelled")
@@ -174,7 +177,10 @@ func (rm *ResponseManager) UnpauseResponse(ctx context.Context, requestID graphs
174177
// PauseResponse pauses an in progress response (may take 1 or more blocks to process)
175178
func (rm *ResponseManager) PauseResponse(ctx context.Context, requestID graphsync.RequestID) error {
176179
response := make(chan error, 1)
177-
rm.send(&pauseRequestMessage{requestID, response}, ctx.Done())
180+
err := rm.send(&pauseRequestMessage{requestID, response}, ctx.Done())
181+
if err != nil {
182+
return err
183+
}
178184
select {
179185
case <-rm.ctx.Done():
180186
return errors.New("context cancelled")
@@ -186,7 +192,10 @@ func (rm *ResponseManager) PauseResponse(ctx context.Context, requestID graphsyn
186192
// CancelResponse cancels an in progress response
187193
func (rm *ResponseManager) CancelResponse(ctx context.Context, requestID graphsync.RequestID) error {
188194
response := make(chan error, 1)
189-
rm.send(&errorRequestMessage{requestID, queryexecutor.ErrCancelledByCommand, response}, ctx.Done())
195+
err := rm.send(&errorRequestMessage{requestID, queryexecutor.ErrCancelledByCommand, response}, ctx.Done())
196+
if err != nil {
197+
return err
198+
}
190199
select {
191200
case <-rm.ctx.Done():
192201
return errors.New("context cancelled")
@@ -198,7 +207,10 @@ func (rm *ResponseManager) CancelResponse(ctx context.Context, requestID graphsy
198207
// UpdateRequest updates an in progress response
199208
func (rm *ResponseManager) UpdateResponse(ctx context.Context, requestID graphsync.RequestID, extensions ...graphsync.ExtensionData) error {
200209
response := make(chan error, 1)
201-
rm.send(&updateRequestMessage{requestID, extensions, response}, ctx.Done())
210+
err := rm.send(&updateRequestMessage{requestID, extensions, response}, ctx.Done())
211+
if err != nil {
212+
return err
213+
}
202214
select {
203215
case <-rm.ctx.Done():
204216
return errors.New("context cancelled")
@@ -210,7 +222,7 @@ func (rm *ResponseManager) UpdateResponse(ctx context.Context, requestID graphsy
210222
// Synchronize is a utility method that blocks until all current messages are processed
211223
func (rm *ResponseManager) synchronize() {
212224
sync := make(chan error)
213-
rm.send(&synchronizeMessage{sync}, nil)
225+
_ = rm.send(&synchronizeMessage{sync}, nil)
214226
select {
215227
case <-rm.ctx.Done():
216228
case <-sync:
@@ -219,18 +231,18 @@ func (rm *ResponseManager) synchronize() {
219231

220232
// StartTask starts the given task from the peer task queue
221233
func (rm *ResponseManager) StartTask(task *peertask.Task, p peer.ID, responseTaskChan chan<- queryexecutor.ResponseTask) {
222-
rm.send(&startTaskRequest{task, p, responseTaskChan}, nil)
234+
_ = rm.send(&startTaskRequest{task, p, responseTaskChan}, nil)
223235
}
224236

225237
// GetUpdates is called to read pending updates for a task and clear them
226238
func (rm *ResponseManager) GetUpdates(requestID graphsync.RequestID, updatesChan chan<- []gsmsg.GraphSyncRequest) {
227-
rm.send(&responseUpdateRequest{requestID, updatesChan}, nil)
239+
_ = rm.send(&responseUpdateRequest{requestID, updatesChan}, nil)
228240
}
229241

230242
// FinishTask marks a task from the task queue as done
231243
func (rm *ResponseManager) FinishTask(task *peertask.Task, p peer.ID, err error) {
232244
done := make(chan struct{}, 1)
233-
rm.send(&finishTaskRequest{task, p, err, done}, nil)
245+
_ = rm.send(&finishTaskRequest{task, p, err, done}, nil)
234246
select {
235247
case <-rm.ctx.Done():
236248
case <-done:
@@ -240,7 +252,7 @@ func (rm *ResponseManager) FinishTask(task *peertask.Task, p peer.ID, err error)
240252
// CloseWithNetworkError closes a request due to a network error
241253
func (rm *ResponseManager) CloseWithNetworkError(requestID graphsync.RequestID) {
242254
done := make(chan error, 1)
243-
rm.send(&errorRequestMessage{requestID, queryexecutor.ErrNetworkError, done}, nil)
255+
_ = rm.send(&errorRequestMessage{requestID, queryexecutor.ErrNetworkError, done}, nil)
244256
select {
245257
case <-rm.ctx.Done():
246258
case <-done:
@@ -250,7 +262,7 @@ func (rm *ResponseManager) CloseWithNetworkError(requestID graphsync.RequestID)
250262
// TerminateRequest indicates a request has finished sending data and should no longer be tracked
251263
func (rm *ResponseManager) TerminateRequest(requestID graphsync.RequestID) {
252264
done := make(chan struct{}, 1)
253-
rm.send(&terminateRequestMessage{requestID, done}, nil)
265+
_ = rm.send(&terminateRequestMessage{requestID, done}, nil)
254266
select {
255267
case <-rm.ctx.Done():
256268
case <-done:
@@ -260,7 +272,7 @@ func (rm *ResponseManager) TerminateRequest(requestID graphsync.RequestID) {
260272
// PeerState gets current state of the outgoing responses for a given peer
261273
func (rm *ResponseManager) PeerState(p peer.ID) peerstate.PeerState {
262274
response := make(chan peerstate.PeerState)
263-
rm.send(&peerStateMessage{p, response}, nil)
275+
_ = rm.send(&peerStateMessage{p, response}, nil)
264276
select {
265277
case <-rm.ctx.Done():
266278
return peerstate.PeerState{}
@@ -269,11 +281,20 @@ func (rm *ResponseManager) PeerState(p peer.ID) peerstate.PeerState {
269281
}
270282
}
271283

272-
func (rm *ResponseManager) send(message responseManagerMessage, done <-chan struct{}) {
284+
func (rm *ResponseManager) send(message responseManagerMessage, done <-chan struct{}) error {
285+
// prioritize cancelled context
286+
select {
287+
case <-done:
288+
return errors.New("unable to send message before cancellation")
289+
default:
290+
}
273291
select {
274292
case <-rm.ctx.Done():
293+
return rm.ctx.Err()
275294
case <-done:
295+
return errors.New("unable to send message before cancellation")
276296
case rm.messages <- message:
297+
return nil
277298
}
278299
}
279300

0 commit comments

Comments
 (0)