@@ -25,7 +25,8 @@ struct Slot<T> {
25
25
/// The current stamp.
26
26
stamp : AtomicUsize ,
27
27
28
- /// The message in this slot.
28
+ /// The message in this slot. Either read out in `read` or dropped through
29
+ /// `discard_all_messages`.
29
30
msg : UnsafeCell < MaybeUninit < T > > ,
30
31
}
31
32
@@ -439,21 +440,99 @@ impl<T> Channel<T> {
439
440
Some ( self . cap )
440
441
}
441
442
442
- /// Disconnects the channel and wakes up all blocked senders and receivers.
443
+ /// Disconnects senders and wakes up all blocked receivers.
443
444
///
444
445
/// Returns `true` if this call disconnected the channel.
445
- pub ( crate ) fn disconnect ( & self ) -> bool {
446
+ pub ( crate ) fn disconnect_senders ( & self ) -> bool {
446
447
let tail = self . tail . fetch_or ( self . mark_bit , Ordering :: SeqCst ) ;
447
448
448
449
if tail & self . mark_bit == 0 {
449
- self . senders . disconnect ( ) ;
450
450
self . receivers . disconnect ( ) ;
451
451
true
452
452
} else {
453
453
false
454
454
}
455
455
}
456
456
457
+ /// Disconnects receivers and wakes up all blocked senders.
458
+ ///
459
+ /// Returns `true` if this call disconnected the channel.
460
+ ///
461
+ /// # Safety
462
+ /// May only be called once upon dropping the last receiver. The
463
+ /// destruction of all other receivers must have been observed with acquire
464
+ /// ordering or stronger.
465
+ pub ( crate ) unsafe fn disconnect_receivers ( & self ) -> bool {
466
+ let tail = self . tail . fetch_or ( self . mark_bit , Ordering :: SeqCst ) ;
467
+ let disconnected = if tail & self . mark_bit == 0 {
468
+ self . senders . disconnect ( ) ;
469
+ true
470
+ } else {
471
+ false
472
+ } ;
473
+
474
+ self . discard_all_messages ( tail) ;
475
+ disconnected
476
+ }
477
+
478
+ /// Discards all messages.
479
+ ///
480
+ /// `tail` should be the current (and therefore last) value of `tail`.
481
+ ///
482
+ /// # Panicking
483
+ /// If a destructor panics, the remaining messages are leaked, matching the
484
+ /// behaviour of the unbounded channel.
485
+ ///
486
+ /// # Safety
487
+ /// This method must only be called when dropping the last receiver. The
488
+ /// destruction of all other receivers must have been observed with acquire
489
+ /// ordering or stronger.
490
+ unsafe fn discard_all_messages ( & self , tail : usize ) {
491
+ debug_assert ! ( self . is_disconnected( ) ) ;
492
+
493
+ // Only receivers modify `head`, so since we are the last one,
494
+ // this value will not change and will not be observed (since
495
+ // no new messages can be sent after disconnection).
496
+ let mut head = self . head . load ( Ordering :: Relaxed ) ;
497
+ let tail = tail & !self . mark_bit ;
498
+
499
+ let backoff = Backoff :: new ( ) ;
500
+ loop {
501
+ // Deconstruct the head.
502
+ let index = head & ( self . mark_bit - 1 ) ;
503
+ let lap = head & !( self . one_lap - 1 ) ;
504
+
505
+ // Inspect the corresponding slot.
506
+ debug_assert ! ( index < self . buffer. len( ) ) ;
507
+ let slot = unsafe { self . buffer . get_unchecked ( index) } ;
508
+ let stamp = slot. stamp . load ( Ordering :: Acquire ) ;
509
+
510
+ // If the stamp is ahead of the head by 1, we may drop the message.
511
+ if head + 1 == stamp {
512
+ head = if index + 1 < self . cap {
513
+ // Same lap, incremented index.
514
+ // Set to `{ lap: lap, mark: 0, index: index + 1 }`.
515
+ head + 1
516
+ } else {
517
+ // One lap forward, index wraps around to zero.
518
+ // Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`.
519
+ lap. wrapping_add ( self . one_lap )
520
+ } ;
521
+
522
+ unsafe {
523
+ ( * slot. msg . get ( ) ) . assume_init_drop ( ) ;
524
+ }
525
+ // If the tail equals the head, that means the channel is empty.
526
+ } else if tail == head {
527
+ return ;
528
+ // Otherwise, a sender is about to write into the slot, so we need
529
+ // to wait for it to update the stamp.
530
+ } else {
531
+ backoff. spin_heavy ( ) ;
532
+ }
533
+ }
534
+ }
535
+
457
536
/// Returns `true` if the channel is disconnected.
458
537
pub ( crate ) fn is_disconnected ( & self ) -> bool {
459
538
self . tail . load ( Ordering :: SeqCst ) & self . mark_bit != 0
@@ -483,23 +562,3 @@ impl<T> Channel<T> {
483
562
head. wrapping_add ( self . one_lap ) == tail & !self . mark_bit
484
563
}
485
564
}
486
-
487
- impl < T > Drop for Channel < T > {
488
- fn drop ( & mut self ) {
489
- // Get the index of the head.
490
- let hix = self . head . load ( Ordering :: Relaxed ) & ( self . mark_bit - 1 ) ;
491
-
492
- // Loop over all slots that hold a message and drop them.
493
- for i in 0 ..self . len ( ) {
494
- // Compute the index of the next slot holding a message.
495
- let index = if hix + i < self . cap { hix + i } else { hix + i - self . cap } ;
496
-
497
- unsafe {
498
- debug_assert ! ( index < self . buffer. len( ) ) ;
499
- let slot = self . buffer . get_unchecked_mut ( index) ;
500
- let msg = & mut * slot. msg . get ( ) ;
501
- msg. as_mut_ptr ( ) . drop_in_place ( ) ;
502
- }
503
- }
504
- }
505
- }
0 commit comments