Skip to content

Commit 5b921b5

Browse files
committed
fix(cancellation): handle message cancellation properly
a previous fix #391, attempted to address a lock that occurred on context cancel. however in doing so, it introduced a new lock. essentially, if a message was not sent to the requestmanager/responsemanager go routine, waiting for a response to that message could last indefinitely. the previous fix therefore stopped waiting when the calling context cancelled. However once a message reaches the go routine of the requestmanager/responsemanager, it's important that it's processed to completion, so that the the message loop doesn't lock. The proper fix is to detect when the message is sent successfully, and if so, wait for it to be processed. If it isn't sent, we can safely abort the go routine immediately.
1 parent 6a2fa5c commit 5b921b5

File tree

2 files changed

+68
-41
lines changed

2 files changed

+68
-41
lines changed

requestmanager/client.go

+34-20
Original file line numberDiff line numberDiff line change
@@ -187,13 +187,14 @@ func (rm *RequestManager) NewRequest(ctx context.Context,
187187

188188
inProgressRequestChan := make(chan inProgressRequest)
189189

190-
rm.send(&newRequestMessage{requestID, span, p, root, selectorNode, extensions, inProgressRequestChan}, ctx.Done())
190+
err := rm.send(&newRequestMessage{requestID, span, p, root, selectorNode, extensions, inProgressRequestChan}, ctx.Done())
191+
if err != nil {
192+
return rm.emptyResponse()
193+
}
191194
var receivedInProgressRequest inProgressRequest
192195
select {
193196
case <-rm.ctx.Done():
194197
return rm.emptyResponse()
195-
case <-ctx.Done():
196-
return rm.emptyResponse()
197198
case receivedInProgressRequest = <-inProgressRequestChan:
198199
}
199200

@@ -283,12 +284,13 @@ func (rm *RequestManager) cancelRequestAndClose(requestID graphsync.RequestID,
283284
// CancelRequest cancels the given request ID and waits for the request to terminate
284285
func (rm *RequestManager) CancelRequest(ctx context.Context, requestID graphsync.RequestID) error {
285286
terminated := make(chan error, 1)
286-
rm.send(&cancelRequestMessage{requestID, terminated, graphsync.RequestClientCancelledErr{}}, ctx.Done())
287+
err := rm.send(&cancelRequestMessage{requestID, terminated, graphsync.RequestClientCancelledErr{}}, ctx.Done())
288+
if err != nil {
289+
return err
290+
}
287291
select {
288292
case <-rm.ctx.Done():
289293
return errors.New("context cancelled")
290-
case <-ctx.Done():
291-
return ctx.Err()
292294
case err := <-terminated:
293295
return err
294296
}
@@ -300,19 +302,20 @@ func (rm *RequestManager) ProcessResponses(p peer.ID,
300302
responses []gsmsg.GraphSyncResponse,
301303
blks []blocks.Block) {
302304

303-
rm.send(&processResponsesMessage{p, responses, blks}, nil)
305+
_ = rm.send(&processResponsesMessage{p, responses, blks}, nil)
304306
}
305307

306308
// UnpauseRequest unpauses a request that was paused in a block hook based request ID
307309
// Can also send extensions with unpause
308310
func (rm *RequestManager) UnpauseRequest(ctx context.Context, requestID graphsync.RequestID, extensions ...graphsync.ExtensionData) error {
309311
response := make(chan error, 1)
310-
rm.send(&unpauseRequestMessage{requestID, extensions, response}, ctx.Done())
312+
err := rm.send(&unpauseRequestMessage{requestID, extensions, response}, ctx.Done())
313+
if err != nil {
314+
return err
315+
}
311316
select {
312317
case <-rm.ctx.Done():
313318
return errors.New("context cancelled")
314-
case <-ctx.Done():
315-
return ctx.Err()
316319
case err := <-response:
317320
return err
318321
}
@@ -321,12 +324,13 @@ func (rm *RequestManager) UnpauseRequest(ctx context.Context, requestID graphsyn
321324
// PauseRequest pauses an in progress request (may take 1 or more blocks to process)
322325
func (rm *RequestManager) PauseRequest(ctx context.Context, requestID graphsync.RequestID) error {
323326
response := make(chan error, 1)
324-
rm.send(&pauseRequestMessage{requestID, response}, ctx.Done())
327+
err := rm.send(&pauseRequestMessage{requestID, response}, ctx.Done())
328+
if err != nil {
329+
return err
330+
}
325331
select {
326332
case <-rm.ctx.Done():
327333
return errors.New("context cancelled")
328-
case <-ctx.Done():
329-
return ctx.Err()
330334
case err := <-response:
331335
return err
332336
}
@@ -335,26 +339,27 @@ func (rm *RequestManager) PauseRequest(ctx context.Context, requestID graphsync.
335339
// UpdateRequest updates an in progress request
336340
func (rm *RequestManager) UpdateRequest(ctx context.Context, requestID graphsync.RequestID, extensions ...graphsync.ExtensionData) error {
337341
response := make(chan error, 1)
338-
rm.send(&updateRequestMessage{requestID, extensions, response}, ctx.Done())
342+
err := rm.send(&updateRequestMessage{requestID, extensions, response}, ctx.Done())
343+
if err != nil {
344+
return err
345+
}
339346
select {
340347
case <-rm.ctx.Done():
341348
return errors.New("context cancelled")
342-
case <-ctx.Done():
343-
return ctx.Err()
344349
case err := <-response:
345350
return err
346351
}
347352
}
348353

349354
// GetRequestTask gets data for the given task in the request queue
350355
func (rm *RequestManager) GetRequestTask(p peer.ID, task *peertask.Task, requestExecutionChan chan executor.RequestTask) {
351-
rm.send(&getRequestTaskMessage{p, task, requestExecutionChan}, nil)
356+
_ = rm.send(&getRequestTaskMessage{p, task, requestExecutionChan}, nil)
352357
}
353358

354359
// ReleaseRequestTask releases a task request the requestQueue
355360
func (rm *RequestManager) ReleaseRequestTask(p peer.ID, task *peertask.Task, err error) {
356361
done := make(chan struct{}, 1)
357-
rm.send(&releaseRequestTaskMessage{p, task, err, done}, nil)
362+
_ = rm.send(&releaseRequestTaskMessage{p, task, err, done}, nil)
358363
select {
359364
case <-rm.ctx.Done():
360365
case <-done:
@@ -364,7 +369,7 @@ func (rm *RequestManager) ReleaseRequestTask(p peer.ID, task *peertask.Task, err
364369
// PeerState gets stats on all outgoing requests for a given peer
365370
func (rm *RequestManager) PeerState(p peer.ID) peerstate.PeerState {
366371
response := make(chan peerstate.PeerState)
367-
rm.send(&peerStateMessage{p, response}, nil)
372+
_ = rm.send(&peerStateMessage{p, response}, nil)
368373
select {
369374
case <-rm.ctx.Done():
370375
return peerstate.PeerState{}
@@ -392,11 +397,20 @@ func (rm *RequestManager) Shutdown() {
392397
rm.cancel()
393398
}
394399

395-
func (rm *RequestManager) send(message requestManagerMessage, done <-chan struct{}) {
400+
func (rm *RequestManager) send(message requestManagerMessage, done <-chan struct{}) error {
401+
// prioritize cancelled context
402+
select {
403+
case <-done:
404+
return errors.New("unable to send message before cancellation")
405+
default:
406+
}
396407
select {
397408
case <-rm.ctx.Done():
409+
return rm.ctx.Err()
398410
case <-done:
411+
return errors.New("unable to send message before cancellation")
399412
case rm.messages <- message:
413+
return nil
400414
}
401415
}
402416

responsemanager/client.go

+34-21
Original file line numberDiff line numberDiff line change
@@ -157,18 +157,19 @@ func New(ctx context.Context,
157157

158158
// ProcessRequests processes incoming requests for the given peer
159159
func (rm *ResponseManager) ProcessRequests(ctx context.Context, p peer.ID, requests []gsmsg.GraphSyncRequest) {
160-
rm.send(&processRequestsMessage{p, requests}, ctx.Done())
160+
_ = rm.send(&processRequestsMessage{p, requests}, ctx.Done())
161161
}
162162

163163
// UnpauseResponse unpauses a response that was previously paused
164164
func (rm *ResponseManager) UnpauseResponse(ctx context.Context, requestID graphsync.RequestID, extensions ...graphsync.ExtensionData) error {
165165
response := make(chan error, 1)
166-
rm.send(&unpauseRequestMessage{requestID, response, extensions}, ctx.Done())
166+
err := rm.send(&unpauseRequestMessage{requestID, response, extensions}, ctx.Done())
167+
if err != nil {
168+
return err
169+
}
167170
select {
168171
case <-rm.ctx.Done():
169172
return errors.New("context cancelled")
170-
case <-ctx.Done():
171-
return ctx.Err()
172173
case err := <-response:
173174
return err
174175
}
@@ -177,12 +178,13 @@ func (rm *ResponseManager) UnpauseResponse(ctx context.Context, requestID graphs
177178
// PauseResponse pauses an in progress response (may take 1 or more blocks to process)
178179
func (rm *ResponseManager) PauseResponse(ctx context.Context, requestID graphsync.RequestID) error {
179180
response := make(chan error, 1)
180-
rm.send(&pauseRequestMessage{requestID, response}, ctx.Done())
181+
err := rm.send(&pauseRequestMessage{requestID, response}, ctx.Done())
182+
if err != nil {
183+
return err
184+
}
181185
select {
182186
case <-rm.ctx.Done():
183187
return errors.New("context cancelled")
184-
case <-ctx.Done():
185-
return ctx.Err()
186188
case err := <-response:
187189
return err
188190
}
@@ -191,12 +193,13 @@ func (rm *ResponseManager) PauseResponse(ctx context.Context, requestID graphsyn
191193
// CancelResponse cancels an in progress response
192194
func (rm *ResponseManager) CancelResponse(ctx context.Context, requestID graphsync.RequestID) error {
193195
response := make(chan error, 1)
194-
rm.send(&errorRequestMessage{requestID, queryexecutor.ErrCancelledByCommand, response}, ctx.Done())
196+
err := rm.send(&errorRequestMessage{requestID, queryexecutor.ErrCancelledByCommand, response}, ctx.Done())
197+
if err != nil {
198+
return err
199+
}
195200
select {
196201
case <-rm.ctx.Done():
197202
return errors.New("context cancelled")
198-
case <-ctx.Done():
199-
return ctx.Err()
200203
case err := <-response:
201204
return err
202205
}
@@ -205,12 +208,13 @@ func (rm *ResponseManager) CancelResponse(ctx context.Context, requestID graphsy
205208
// UpdateRequest updates an in progress response
206209
func (rm *ResponseManager) UpdateResponse(ctx context.Context, requestID graphsync.RequestID, extensions ...graphsync.ExtensionData) error {
207210
response := make(chan error, 1)
208-
rm.send(&updateRequestMessage{requestID, extensions, response}, ctx.Done())
211+
err := rm.send(&updateRequestMessage{requestID, extensions, response}, ctx.Done())
212+
if err != nil {
213+
return err
214+
}
209215
select {
210216
case <-rm.ctx.Done():
211217
return errors.New("context cancelled")
212-
case <-ctx.Done():
213-
return ctx.Err()
214218
case err := <-response:
215219
return err
216220
}
@@ -219,7 +223,7 @@ func (rm *ResponseManager) UpdateResponse(ctx context.Context, requestID graphsy
219223
// Synchronize is a utility method that blocks until all current messages are processed
220224
func (rm *ResponseManager) synchronize() {
221225
sync := make(chan error)
222-
rm.send(&synchronizeMessage{sync}, nil)
226+
_ = rm.send(&synchronizeMessage{sync}, nil)
223227
select {
224228
case <-rm.ctx.Done():
225229
case <-sync:
@@ -228,18 +232,18 @@ func (rm *ResponseManager) synchronize() {
228232

229233
// StartTask starts the given task from the peer task queue
230234
func (rm *ResponseManager) StartTask(task *peertask.Task, p peer.ID, responseTaskChan chan<- queryexecutor.ResponseTask) {
231-
rm.send(&startTaskRequest{task, p, responseTaskChan}, nil)
235+
_ = rm.send(&startTaskRequest{task, p, responseTaskChan}, nil)
232236
}
233237

234238
// GetUpdates is called to read pending updates for a task and clear them
235239
func (rm *ResponseManager) GetUpdates(requestID graphsync.RequestID, updatesChan chan<- []gsmsg.GraphSyncRequest) {
236-
rm.send(&responseUpdateRequest{requestID, updatesChan}, nil)
240+
_ = rm.send(&responseUpdateRequest{requestID, updatesChan}, nil)
237241
}
238242

239243
// FinishTask marks a task from the task queue as done
240244
func (rm *ResponseManager) FinishTask(task *peertask.Task, p peer.ID, err error) {
241245
done := make(chan struct{}, 1)
242-
rm.send(&finishTaskRequest{task, p, err, done}, nil)
246+
_ = rm.send(&finishTaskRequest{task, p, err, done}, nil)
243247
select {
244248
case <-rm.ctx.Done():
245249
case <-done:
@@ -249,7 +253,7 @@ func (rm *ResponseManager) FinishTask(task *peertask.Task, p peer.ID, err error)
249253
// CloseWithNetworkError closes a request due to a network error
250254
func (rm *ResponseManager) CloseWithNetworkError(requestID graphsync.RequestID) {
251255
done := make(chan error, 1)
252-
rm.send(&errorRequestMessage{requestID, queryexecutor.ErrNetworkError, done}, nil)
256+
_ = rm.send(&errorRequestMessage{requestID, queryexecutor.ErrNetworkError, done}, nil)
253257
select {
254258
case <-rm.ctx.Done():
255259
case <-done:
@@ -259,7 +263,7 @@ func (rm *ResponseManager) CloseWithNetworkError(requestID graphsync.RequestID)
259263
// TerminateRequest indicates a request has finished sending data and should no longer be tracked
260264
func (rm *ResponseManager) TerminateRequest(requestID graphsync.RequestID) {
261265
done := make(chan struct{}, 1)
262-
rm.send(&terminateRequestMessage{requestID, done}, nil)
266+
_ = rm.send(&terminateRequestMessage{requestID, done}, nil)
263267
select {
264268
case <-rm.ctx.Done():
265269
case <-done:
@@ -269,7 +273,7 @@ func (rm *ResponseManager) TerminateRequest(requestID graphsync.RequestID) {
269273
// PeerState gets current state of the outgoing responses for a given peer
270274
func (rm *ResponseManager) PeerState(p peer.ID) peerstate.PeerState {
271275
response := make(chan peerstate.PeerState)
272-
rm.send(&peerStateMessage{p, response}, nil)
276+
_ = rm.send(&peerStateMessage{p, response}, nil)
273277
select {
274278
case <-rm.ctx.Done():
275279
return peerstate.PeerState{}
@@ -278,11 +282,20 @@ func (rm *ResponseManager) PeerState(p peer.ID) peerstate.PeerState {
278282
}
279283
}
280284

281-
func (rm *ResponseManager) send(message responseManagerMessage, done <-chan struct{}) {
285+
func (rm *ResponseManager) send(message responseManagerMessage, done <-chan struct{}) error {
286+
// prioritize cancelled context
287+
select {
288+
case <-done:
289+
return errors.New("unable to send message before cancellation")
290+
default:
291+
}
282292
select {
283293
case <-rm.ctx.Done():
294+
return rm.ctx.Err()
284295
case <-done:
296+
return errors.New("unable to send message before cancellation")
285297
case rm.messages <- message:
298+
return nil
286299
}
287300
}
288301

0 commit comments

Comments
 (0)