@@ -952,15 +952,22 @@ func (c *dtChannel) open(
952
952
if c .gsKey != nil {
953
953
// Cancel the existing graphsync request
954
954
completed := c .completed
955
- err := c .cancelRequest (ctx )
955
+ errch := c .cancelRequest (ctx )
956
+
957
+ // Wait for the complete callback to be called
958
+ err := waitForCompleteHook (ctx , completed )
956
959
if err != nil {
957
- return nil , xerrors .Errorf ("restarting graphsync request: %w" , err )
960
+ return nil , xerrors .Errorf ("%s: waiting for cancelled graphsync request to complete : %w" , chid , err )
958
961
}
959
962
960
- // Wait for the cancel to go through
961
- err = waitForCancelComplete (ctx , completed )
963
+ // Wait for the cancel request method to complete
964
+ select {
965
+ case err = <- errch :
966
+ case <- ctx .Done ():
967
+ err = xerrors .Errorf ("timed out waiting for graphsync request to be cancelled" )
968
+ }
962
969
if err != nil {
963
- return nil , xerrors .Errorf ("waiting for cancelled graphsync request to complete : %w" , err )
970
+ return nil , xerrors .Errorf ("%s: restarting graphsync request: %w" , chid , err )
964
971
}
965
972
}
966
973
@@ -1003,7 +1010,7 @@ func (c *dtChannel) open(
1003
1010
}, nil
1004
1011
}
1005
1012
1006
- func waitForCancelComplete (ctx context.Context , completed chan struct {}) error {
1013
+ func waitForCompleteHook (ctx context.Context , completed chan struct {}) error {
1007
1014
// Wait for the cancel to propagate through to graphsync, and for
1008
1015
// the graphsync request to complete
1009
1016
select {
@@ -1139,30 +1146,30 @@ func (c *dtChannel) resume(msg datatransfer.Message) error {
1139
1146
}
1140
1147
1141
1148
func (c * dtChannel ) close (ctx context.Context ) error {
1149
+ var errch chan error
1142
1150
c .lk .Lock ()
1143
- defer c .lk .Unlock ()
1144
-
1145
- // Check if the channel was already cancelled
1146
- if c .gsKey == nil {
1147
- return nil
1148
- }
1149
-
1150
- // If it's a graphsync request
1151
- if c .gsKey .p == c .peerID {
1152
- // Cancel the request
1153
- return c .cancelRequest (ctx )
1151
+ {
1152
+ // Check if the channel was already cancelled
1153
+ if c .gsKey != nil {
1154
+ // Check whether it's a graphsync request or response
1155
+ if c .gsKey .p == c .peerID {
1156
+ // Cancel the request
1157
+ errch = c .cancelRequest (ctx )
1158
+ } else {
1159
+ // Cancel the response
1160
+ errch = c .cancelResponse ()
1161
+ }
1162
+ }
1154
1163
}
1164
+ c .lk .Unlock ()
1155
1165
1156
- // It's a graphsync response
1157
-
1158
- // If the requester already cancelled, bail out
1159
- if c .requesterCancelled {
1160
- return nil
1166
+ // Wait for the cancel message to complete
1167
+ select {
1168
+ case err := <- errch :
1169
+ return err
1170
+ case <- ctx .Done ():
1171
+ return ctx .Err ()
1161
1172
}
1162
-
1163
- // Cancel the response
1164
- log .Debugf ("%s: cancelling response" , c .channelID )
1165
- return c .gs .CancelResponse (c .gsKey .p , c .gsKey .requestID )
1166
1173
}
1167
1174
1168
1175
// Called when the responder gets a cancel message from the requester
@@ -1217,34 +1224,80 @@ func (c *dtChannel) cleanup() {
1217
1224
}
1218
1225
1219
1226
func (c * dtChannel ) shutdown (ctx context.Context ) error {
1227
+ // Cancel the graphsync request
1220
1228
c .lk .Lock ()
1221
- defer c .lk .Unlock ()
1229
+ errch := c .cancelRequest (ctx )
1230
+ c .lk .Unlock ()
1222
1231
1223
- // Cancel the graphsync request
1224
- return c .cancelRequest (ctx )
1232
+ // Wait for the cancel message to complete
1233
+ select {
1234
+ case err := <- errch :
1235
+ return err
1236
+ case <- ctx .Done ():
1237
+ return ctx .Err ()
1238
+ }
1225
1239
}
1226
1240
1227
1241
// Cancel the graphsync request.
1228
1242
// Note: must be called under the lock.
1229
- func (c * dtChannel ) cancelRequest (ctx context.Context ) error {
1243
+ func (c * dtChannel ) cancelRequest (ctx context.Context ) chan error {
1244
+ errch := make (chan error , 1 )
1245
+
1230
1246
// Check that the request has not already been cancelled
1231
1247
if c .gsKey == nil {
1232
- return nil
1248
+ errch <- nil
1249
+ return errch
1233
1250
}
1234
1251
1235
- log .Debugf ("%s: cancelling request" , c .channelID )
1236
- err := c .gs .CancelRequest (ctx , c .gsKey .requestID )
1237
- if err != nil {
1252
+ // Clear the graphsync key to indicate that the request has been cancelled
1253
+ gsKey := c .gsKey
1254
+ c .gsKey = nil
1255
+
1256
+ go func () {
1257
+ log .Debugf ("%s: cancelling request" , c .channelID )
1258
+ err := c .gs .CancelRequest (ctx , gsKey .requestID )
1259
+
1238
1260
// Ignore "request not found" errors
1239
- if ! xerrors .Is (graphsync.RequestNotFoundErr {}, err ) {
1240
- return xerrors .Errorf ("cancelling graphsync request for channel %s: %w" , c .channelID , err )
1261
+ if err != nil && ! xerrors .Is (graphsync.RequestNotFoundErr {}, err ) {
1262
+ errch <- xerrors .Errorf ("cancelling graphsync request for channel %s: %w" , c .channelID , err )
1263
+ } else {
1264
+ errch <- nil
1241
1265
}
1266
+ }()
1267
+
1268
+ return errch
1269
+ }
1270
+
1271
+ func (c * dtChannel ) cancelResponse () chan error {
1272
+ errch := make (chan error , 1 )
1273
+
1274
+ // Check if the requester already sent a cancel message,
1275
+ // or the response has already been cancelled
1276
+ if c .requesterCancelled || c .gsKey == nil {
1277
+ errch <- nil
1278
+ return errch
1242
1279
}
1243
1280
1244
- // Clear the graphsync key to indicate that the request has been cancelled
1281
+ // Clear the graphsync key to indicate that the response has been cancelled
1282
+ gsKey := c .gsKey
1245
1283
c .gsKey = nil
1246
1284
1247
- return nil
1285
+ // Cancel the response in a go-routine to avoid locking when the channel's
1286
+ // event queue is drained (potentially calling hooks which take the channel
1287
+ // lock)
1288
+ go func () {
1289
+ log .Debugf ("%s: cancelling response" , c .channelID )
1290
+ err := c .gs .CancelResponse (gsKey .p , gsKey .requestID )
1291
+
1292
+ // Ignore "request not found" errors
1293
+ if err != nil && ! xerrors .Is (graphsync.RequestNotFoundErr {}, err ) {
1294
+ errch <- xerrors .Errorf ("%s: cancelling response: %w" , c .channelID , err )
1295
+ } else {
1296
+ errch <- nil
1297
+ }
1298
+ }()
1299
+
1300
+ return errch
1248
1301
}
1249
1302
1250
1303
// Used in graphsync callbacks to map from graphsync request to the
0 commit comments