@@ -6,7 +6,7 @@ import delay from 'delay'
6
6
import all from 'it-all'
7
7
import drain from 'it-drain'
8
8
import sinon from 'sinon'
9
- import { MessageType } from '../src/index.js'
9
+ import { MessageType , type QueryEvent } from '../src/index.js'
10
10
import * as kadUtils from '../src/utils.js'
11
11
import { createValues } from './utils/create-values.js'
12
12
import { sortDHTs } from './utils/sort-closest-peers.js'
@@ -253,4 +253,105 @@ describe('content routing', () => {
253
253
} , { } ) )
254
254
expect ( provs ) . to . have . length ( 1 )
255
255
} )
256
+
257
+ it ( 'aborts provide operation when abort signal is triggered before starting' , async function ( ) {
258
+ this . timeout ( 20 * 1000 )
259
+
260
+ const dhts = await sortDHTs ( await Promise . all ( [
261
+ testDHT . spawn ( ) ,
262
+ testDHT . spawn ( ) ,
263
+ testDHT . spawn ( ) ,
264
+ testDHT . spawn ( )
265
+ ] ) , await kadUtils . convertBuffer ( cid . multihash . bytes ) )
266
+
267
+ // Spy on network.sendMessage to verify it's not called after abort
268
+ const sendMessageSpy = sinon . spy ( dhts [ 3 ] . network , 'sendMessage' )
269
+
270
+ // Connect peers
271
+ await Promise . all ( [
272
+ testDHT . connect ( dhts [ 0 ] , dhts [ 1 ] ) ,
273
+ testDHT . connect ( dhts [ 1 ] , dhts [ 2 ] ) ,
274
+ testDHT . connect ( dhts [ 2 ] , dhts [ 3 ] )
275
+ ] )
276
+
277
+ const controller = new AbortController ( )
278
+ controller . abort ( )
279
+
280
+ const generator = dhts [ 3 ] . provide ( cid , { signal : controller . signal } )
281
+ await expect ( all ( generator ) ) . to . eventually . be . rejected
282
+ . with . property ( 'message' ) . that . include ( 'aborted' )
283
+
284
+ expect ( sendMessageSpy . called ) . to . be . false ( 'sendMessage should not be called when aborted' )
285
+ } )
286
+
287
+ it ( 'properly terminates generator when a non-immediate abort signal is triggered' , async function ( ) {
288
+ this . timeout ( 20 * 1000 )
289
+
290
+ const dhts = await sortDHTs ( await Promise . all ( [
291
+ testDHT . spawn ( ) ,
292
+ testDHT . spawn ( ) ,
293
+ testDHT . spawn ( ) ,
294
+ testDHT . spawn ( )
295
+ ] ) , await kadUtils . convertBuffer ( cid . multihash . bytes ) )
296
+
297
+ // Connect peers
298
+ await Promise . all ( [
299
+ testDHT . connect ( dhts [ 0 ] , dhts [ 1 ] ) ,
300
+ testDHT . connect ( dhts [ 1 ] , dhts [ 2 ] ) ,
301
+ testDHT . connect ( dhts [ 2 ] , dhts [ 3 ] )
302
+ ] )
303
+
304
+ const sendMessageSpy = sinon . spy ( dhts [ 3 ] . network , 'sendMessage' )
305
+
306
+ const controller = new AbortController ( )
307
+
308
+ // Start the provide operation
309
+ const generator = dhts [ 3 ] . provide ( cid , { signal : controller . signal } )
310
+
311
+ // We want to push the generator manually to control timing
312
+ const reader = async ( ) : Promise < { results : QueryEvent [ ] , aborted : boolean } > => {
313
+ const results = [ ]
314
+ try {
315
+ for await ( const event of generator ) {
316
+ results . push ( event )
317
+ // After we get the first few results, abort the operation
318
+ if ( results . length === 2 ) {
319
+ controller . abort ( )
320
+ // This delay simulates an onward consumer performing work before
321
+ // accepting another value from the generator
322
+ await delay ( 50 )
323
+ }
324
+ }
325
+ } catch ( err ) {
326
+ // We expect an abort error here
327
+ expect ( err ) . to . have . property ( 'message' ) . that . include ( 'abort' )
328
+ return { results, aborted : true }
329
+ }
330
+ return { results, aborted : false }
331
+ }
332
+
333
+ const { results, aborted } = await reader ( )
334
+
335
+ // We should have aborted
336
+ expect ( aborted ) . to . be . true ( 'Generator should have thrown an abort error' )
337
+
338
+ // We should have received some events before the abort
339
+ expect ( results . length ) . to . be . greaterThan ( 0 , 'Should have received some events before abort' )
340
+
341
+ // After aborting, if we try to get more from the generator, it should be
342
+ // done. Testing this requires using the original generator reference, but
343
+ // we've already drained it. So instead we check side effects to confirm the
344
+ // operation stopped.
345
+
346
+ // Wait a reasonable time for any pending operations to complete
347
+ await delay ( 500 )
348
+
349
+ // Check that no new network calls were made after the abort
350
+ const initialMessageCalls = sendMessageSpy . callCount
351
+ await delay ( 200 )
352
+
353
+ // The number of calls should not have increased
354
+ expect ( sendMessageSpy . callCount ) . to . equal ( initialMessageCalls ,
355
+ 'No new network calls should be made after abort' )
356
+ } )
256
357
} )
0 commit comments