Skip to content

Commit 5fe4437

Browse files
authored
feat(subscriber): spill callsites into hash set (#97)
This changes the `Callsites` behavior when the array of callsites is full. Currently, we panic in this case. This means that we are quite generous with array sizes, for cases where, e.g., multiple async runtimes are in use. However, being more generous with the array's length makes the linear search performance worse. This branch replaces the panicking behavior with a spillover behavior. Once the array of callsites is full, we will now store any additional callsites in a `HashSet`, rather than panicking. This means we can make the arrays a bit shorter, and (perhaps more importantly) it means we will no longer panic in the (rare) case where an app contains a big pile of interesting callsites. The spillover `HashSet` is protected by a `RwLock`, which is kind of a bummer, since it may be locked when checking if a span/event is in the set of callsites we care about (but only if we have spilled over). However, it should be _contended_ only very rarely, since writes only occur when registering a new callsite. I added an optional `parking_lot` feature to use `parking_lot`'s `RwLock` implementation, which likely offers better performance than `std`'s lock (especially when uncontended, which this lock often is). The feature is disabled by default, for users who don't want the additional dependency. Signed-off-by: Eliza Weisman <[email protected]>
1 parent e979f95 commit 5fe4437

File tree

4 files changed

+114
-25
lines changed

4 files changed

+114
-25
lines changed

console-subscriber/Cargo.toml

+6
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@ authors = ["Eliza Weisman <[email protected]>"]
55
edition = "2018"
66

77
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
8+
[features]
9+
default = []
10+
parking_lot = ["parking_lot_crate", "tracing-subscriber/parking_lot"]
811

912
[dependencies]
1013

@@ -19,6 +22,9 @@ futures = { version = "0.3", default-features = false }
1922
hdrhistogram = { version = "7.3.0", default-features = false, features = ["serialization"] }
2023
serde = { version = "1", features = ["derive"] }
2124
serde_json = "1"
25+
# The parking_lot dependency is renamed, because we want our `parking_lot`
26+
# feature to also enable `tracing-subscriber`'s parking_lot feature flag.
27+
parking_lot_crate = { package = "parking_lot", version = "0.11", optional = true }
2228

2329
[dev-dependencies]
2430

console-subscriber/src/callsites.rs

+53-21
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
1+
use crate::sync::RwLock;
12
use std::{
3+
collections::HashSet,
24
fmt, ptr,
35
sync::atomic::{AtomicPtr, AtomicUsize, Ordering},
46
};
5-
use tracing_core::Metadata;
7+
use tracing_core::{callsite, Metadata};
68

79
pub(crate) struct Callsites<const MAX_CALLSITES: usize> {
810
ptrs: [AtomicPtr<Metadata<'static>>; MAX_CALLSITES],
911
len: AtomicUsize,
12+
spill: RwLock<HashSet<callsite::Identifier>>,
1013
}
1114

1215
impl<const MAX_CALLSITES: usize> Callsites<MAX_CALLSITES> {
@@ -20,31 +23,58 @@ impl<const MAX_CALLSITES: usize> Callsites<MAX_CALLSITES> {
2023
}
2124

2225
let idx = self.len.fetch_add(1, Ordering::AcqRel);
23-
assert!(
24-
idx < MAX_CALLSITES,
25-
"you tried to store more than {} callsites, \
26-
time to make the callsite sets bigger i guess \
27-
(please open an issue for this)",
28-
MAX_CALLSITES,
29-
);
30-
self.ptrs[idx]
31-
.compare_exchange(
32-
ptr::null_mut(),
33-
callsite as *const _ as *mut _,
34-
Ordering::AcqRel,
35-
Ordering::Acquire,
36-
)
37-
.expect("a callsite would have been clobbered by `insert` (this is a bug)");
26+
if idx <= MAX_CALLSITES {
27+
// If there's still room in the callsites array, stick the address
28+
// in there.
29+
self.ptrs[idx]
30+
.compare_exchange(
31+
ptr::null_mut(),
32+
callsite as *const _ as *mut _,
33+
Ordering::AcqRel,
34+
Ordering::Acquire,
35+
)
36+
.expect("a callsite would have been clobbered by `insert` (this is a bug)");
37+
} else {
38+
// Otherwise, we've filled the callsite array (sad!). Spill over
39+
// into a hash set.
40+
self.spill.write().insert(callsite.callsite());
41+
}
3842
}
3943

4044
pub(crate) fn contains(&self, callsite: &'static Metadata<'static>) -> bool {
41-
let len = self.len.load(Ordering::Acquire);
42-
for cs in &self.ptrs[..len] {
43-
if ptr::eq(cs.load(Ordering::Acquire), callsite) {
44-
return true;
45+
let mut start = 0;
46+
let mut len = self.len.load(Ordering::Acquire);
47+
loop {
48+
for cs in &self.ptrs[start..len] {
49+
if ptr::eq(cs.load(Ordering::Acquire), callsite) {
50+
return true;
51+
}
4552
}
53+
54+
// Did the length change while we were iterating over the callsite array?
55+
let new_len = self.len.load(Ordering::Acquire);
56+
if new_len > len {
57+
// If so, check again to see if the callsite is contained in any
58+
// callsites that were pushed since the last time we loaded `self.len`.
59+
start = len;
60+
len = new_len;
61+
continue;
62+
}
63+
64+
// If the callsite array is not full, we have checked everything.
65+
if len <= MAX_CALLSITES {
66+
return false;
67+
}
68+
69+
// Otherwise, we may have spilled over to the slower fallback hash
70+
// set. Check that.
71+
return self.check_spill(callsite);
4672
}
47-
false
73+
}
74+
75+
#[cold]
76+
fn check_spill(&self, callsite: &'static Metadata<'static>) -> bool {
77+
self.spill.read().contains(&callsite.callsite())
4878
}
4979
}
5080

@@ -69,6 +99,7 @@ impl<const MAX_CALLSITES: usize> Default for Callsites<MAX_CALLSITES> {
6999
Self {
70100
ptrs: [NULLPTR; MAX_CALLSITES],
71101
len: AtomicUsize::new(0),
102+
spill: Default::default(),
72103
}
73104
}
74105
}
@@ -80,6 +111,7 @@ impl<const MAX_CALLSITES: usize> fmt::Debug for Callsites<MAX_CALLSITES> {
80111
.field("ptrs", &&self.ptrs[..len])
81112
.field("len", &len)
82113
.field("max_callsites", &MAX_CALLSITES)
114+
.field("spill", &self.spill)
83115
.finish()
84116
}
85117
}

console-subscriber/src/lib.rs

+6-4
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ mod builder;
2121
mod callsites;
2222
mod init;
2323
mod record;
24+
pub(crate) mod sync;
2425

2526
use aggregator::Aggregator;
2627
pub use builder::Builder;
@@ -40,15 +41,16 @@ pub struct TasksLayer {
4041
/// Set of callsites for spans representing spawned tasks.
4142
///
4243
/// For task spans, each runtime these will have like, 1-5 callsites in it, max, so
43-
/// 16 is probably fine. For async operations, we may need a bigger callsites array.
44-
spawn_callsites: Callsites<16>,
44+
/// 8 should be plenty. If several runtimes are in use, we may have to spill
45+
/// over into the backup hashmap, but it's unlikely.
46+
spawn_callsites: Callsites<8>,
4547

4648
/// Set of callsites for events representing waker operations.
4749
///
48-
/// 32 is probably a reasonable number of waker ops; it's a bit generous if
50+
/// 16 is probably a reasonable number of waker ops; it's a bit generous if
4951
/// there's only one async runtime library in use, but if there are multiple,
5052
/// they might all have their own sets of waker ops.
51-
waker_callsites: Callsites<32>,
53+
waker_callsites: Callsites<16>,
5254
}
5355

5456
pub struct Server {

console-subscriber/src/sync.rs

+49
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
// Some of these methods and re-exports may not be used currently.
2+
#![allow(dead_code, unused_imports)]
3+
4+
#[cfg(feature = "parking_lot")]
5+
pub(crate) use parking_lot_crate::{RwLock, RwLockReadGuard, RwLockWriteGuard};
6+
7+
#[cfg(not(feature = "parking_lot"))]
8+
pub(crate) use self::std_impl::*;
9+
10+
#[cfg(not(feature = "parking_lot"))]
11+
mod std_impl {
12+
use std::sync::{self, PoisonError, TryLockError};
13+
pub use std::sync::{RwLockReadGuard, RwLockWriteGuard};
14+
15+
#[derive(Debug, Default)]
16+
pub(crate) struct RwLock<T: ?Sized>(sync::RwLock<T>);
17+
18+
impl<T> RwLock<T> {
19+
pub(crate) fn new(data: T) -> Self {
20+
Self(sync::RwLock::new(data))
21+
}
22+
}
23+
24+
impl<T: ?Sized> RwLock<T> {
25+
pub(crate) fn read(&self) -> RwLockReadGuard<'_, T> {
26+
self.0.read().unwrap_or_else(PoisonError::into_inner)
27+
}
28+
29+
pub(crate) fn try_read(&self) -> Option<RwLockReadGuard<'_, T>> {
30+
match self.0.try_read() {
31+
Ok(guard) => Some(guard),
32+
Err(TryLockError::Poisoned(p)) => Some(p.into_inner()),
33+
Err(TryLockError::WouldBlock) => None,
34+
}
35+
}
36+
37+
pub(crate) fn write(&self) -> RwLockWriteGuard<'_, T> {
38+
self.0.write().unwrap_or_else(PoisonError::into_inner)
39+
}
40+
41+
pub(crate) fn try_write(&self) -> Option<RwLockWriteGuard<'_, T>> {
42+
match self.0.try_write() {
43+
Ok(guard) => Some(guard),
44+
Err(TryLockError::Poisoned(p)) => Some(p.into_inner()),
45+
Err(TryLockError::WouldBlock) => None,
46+
}
47+
}
48+
}
49+
}

0 commit comments

Comments
 (0)