Skip to content

Commit 94e7834

Browse files
authored
feat(subscriber): Record and send poll times with HdrHistogram (#47)
As suggested in the issue, this PR uses HdrHistogram to record task poll times as nanoseconds. The histogram is serialized into bytes using the V2Serializer. The subscriber now offers a second RPC stream called TaskDetails for a given task ID. The console app starts a details stream when you view the details of a task and stops the stream when you leave it. To demonstrate that it works, I also added some stats and a small chart to the task view using the histogram data. But I won't insist on merging this part, since you may have other plans for the task details view. Closes #36
1 parent 5568bf6 commit 94e7834

File tree

11 files changed

+789
-54
lines changed

11 files changed

+789
-54
lines changed

console-api/src/common.rs

+12
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,18 @@ impl From<tracing_core::span::Id> for SpanId {
129129
}
130130
}
131131

132+
impl From<SpanId> for tracing_core::span::Id {
133+
fn from(span_id: SpanId) -> Self {
134+
tracing_core::span::Id::from_u64(span_id.id)
135+
}
136+
}
137+
138+
impl From<u64> for SpanId {
139+
fn from(id: u64) -> Self {
140+
SpanId { id }
141+
}
142+
}
143+
132144
impl From<&'static tracing_core::Metadata<'static>> for register_metadata::NewMetadata {
133145
fn from(meta: &'static tracing_core::Metadata) -> Self {
134146
register_metadata::NewMetadata {

console-subscriber/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ tracing-core = "0.1.18"
1616
tracing = "0.1.26"
1717
tracing-subscriber = { version = "0.2.17", default-features = false, features = ["fmt", "registry", "env-filter"] }
1818
futures = { version = "0.3", default-features = false }
19+
hdrhistogram = { version = "7.3.0", default-features = false, features = ["serialization"] }
1920

2021
[dev-dependencies]
2122

console-subscriber/src/aggregator.rs

+155-35
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
1-
use super::{Event, WakeOp, Watch};
1+
use crate::WatchRequest;
2+
3+
use super::{Event, WakeOp, Watch, WatchKind};
24
use console_api as proto;
35
use tokio::sync::{mpsc, Notify};
46

57
use futures::FutureExt;
68
use std::{
79
collections::HashMap,
10+
convert::TryInto,
811
ops::{Deref, DerefMut},
912
sync::{
1013
atomic::{AtomicBool, Ordering::*},
@@ -13,12 +16,18 @@ use std::{
1316
time::{Duration, SystemTime},
1417
};
1518
use tracing_core::{span, Metadata};
19+
20+
use hdrhistogram::{
21+
serialization::{Serializer, V2SerializeError, V2Serializer},
22+
Histogram,
23+
};
24+
1625
pub(crate) struct Aggregator {
1726
/// Channel of incoming events emitted by `TaskLayer`s.
1827
events: mpsc::Receiver<Event>,
1928

20-
/// New incoming `WatchTasks` RPCs.
21-
rpcs: mpsc::Receiver<Watch>,
29+
/// New incoming RPCs.
30+
rpcs: mpsc::Receiver<WatchKind>,
2231

2332
/// The interval at which new data updates are pushed to clients.
2433
publish_interval: Duration,
@@ -29,8 +38,11 @@ pub(crate) struct Aggregator {
2938
/// Triggers a flush when the event buffer is approaching capacity.
3039
flush_capacity: Arc<Flush>,
3140

32-
// Currently active RPCs streaming task events.
33-
watchers: Vec<Watch>,
41+
/// Currently active RPCs streaming task events.
42+
watchers: Vec<Watch<proto::tasks::TaskUpdate>>,
43+
44+
/// Currently active RPCs streaming task details events, by task ID.
45+
details_watchers: HashMap<span::Id, Vec<Watch<proto::tasks::TaskDetails>>>,
3446

3547
/// *All* metadata for task spans and user-defined spans that we care about.
3648
///
@@ -55,7 +67,6 @@ pub(crate) struct Flush {
5567
pub(crate) triggered: AtomicBool,
5668
}
5769

58-
#[derive(Default)]
5970
struct Stats {
6071
// task stats
6172
polls: u64,
@@ -71,6 +82,8 @@ struct Stats {
7182
waker_clones: u64,
7283
waker_drops: u64,
7384
last_wake: Option<SystemTime>,
85+
86+
poll_times_histogram: Histogram<u64>,
7487
}
7588

7689
#[derive(Default)]
@@ -83,10 +96,31 @@ struct Task {
8396
fields: Vec<proto::Field>,
8497
}
8598

99+
impl Default for Stats {
100+
fn default() -> Self {
101+
Stats {
102+
polls: 0,
103+
current_polls: 0,
104+
created_at: None,
105+
first_poll: None,
106+
last_poll: None,
107+
busy_time: Default::default(),
108+
closed_at: None,
109+
wakes: 0,
110+
waker_clones: 0,
111+
waker_drops: 0,
112+
last_wake: None,
113+
// significant figures should be in the [0-5] range and memory usage
114+
// grows exponentially with higher a sigfig
115+
poll_times_histogram: Histogram::<u64>::new(2).unwrap(),
116+
}
117+
}
118+
}
119+
86120
impl Aggregator {
87121
pub(crate) fn new(
88122
events: mpsc::Receiver<Event>,
89-
rpcs: mpsc::Receiver<Watch>,
123+
rpcs: mpsc::Receiver<WatchKind>,
90124
builder: &crate::Builder,
91125
) -> Self {
92126
Self {
@@ -99,6 +133,7 @@ impl Aggregator {
99133
retention: builder.retention,
100134
events,
101135
watchers: Vec::new(),
136+
details_watchers: HashMap::new(),
102137
all_metadata: Vec::new(),
103138
new_metadata: Vec::new(),
104139
tasks: TaskData {
@@ -130,31 +165,18 @@ impl Aggregator {
130165

131166
// a new client has started watching!
132167
subscription = self.rpcs.recv() => {
133-
if let Some(subscription) = subscription {
134-
tracing::debug!("new subscription");
135-
let new_tasks = self.tasks.all().map(|(id, task)| {
136-
task.to_proto(id.clone())
137-
}).collect();
138-
let now = SystemTime::now();
139-
let stats_update = self.stats.all().map(|(id, stats)| {
140-
(id.into_u64(), stats.to_proto())
141-
}).collect();
142-
// Send the initial state --- if this fails, the subscription is
143-
// already dead.
144-
if subscription.update(&proto::tasks::TaskUpdate {
145-
new_metadata: Some(proto::RegisterMetadata {
146-
metadata: self.all_metadata.clone(),
147-
}),
148-
new_tasks,
149-
stats_update,
150-
now: Some(now.into()),
151-
}) {
152-
self.watchers.push(subscription)
168+
match subscription {
169+
Some(WatchKind::Tasks(subscription)) => {
170+
self.add_task_subscription(subscription);
171+
},
172+
Some(WatchKind::TaskDetail(watch_request)) => {
173+
self.add_task_detail_subscription(watch_request);
174+
},
175+
_ => {
176+
tracing::debug!("rpc channel closed, terminating");
177+
return;
153178
}
154-
} else {
155-
tracing::debug!("rpc channel closed, terminating");
156-
return;
157-
}
179+
};
158180

159181
false
160182
}
@@ -193,6 +215,68 @@ impl Aggregator {
193215
}
194216
}
195217

218+
/// Add the task subscription to the watchers after sending the first update
219+
fn add_task_subscription(&mut self, subscription: Watch<proto::tasks::TaskUpdate>) {
220+
tracing::debug!("new tasks subscription");
221+
let new_tasks = self
222+
.tasks
223+
.all()
224+
.map(|(id, task)| task.to_proto(id.clone()))
225+
.collect();
226+
let now = SystemTime::now();
227+
let stats_update = self
228+
.stats
229+
.all()
230+
.map(|(id, stats)| (id.into_u64(), stats.to_proto()))
231+
.collect();
232+
// Send the initial state --- if this fails, the subscription is already dead
233+
if subscription.update(&proto::tasks::TaskUpdate {
234+
new_metadata: Some(proto::RegisterMetadata {
235+
metadata: self.all_metadata.clone(),
236+
}),
237+
new_tasks,
238+
stats_update,
239+
now: Some(now.into()),
240+
}) {
241+
self.watchers.push(subscription)
242+
}
243+
}
244+
245+
/// Add the task details subscription to the watchers after sending the first update,
246+
/// if the task is found.
247+
fn add_task_detail_subscription(
248+
&mut self,
249+
watch_request: WatchRequest<proto::tasks::TaskDetails>,
250+
) {
251+
let WatchRequest {
252+
id,
253+
stream_sender,
254+
buffer,
255+
} = watch_request;
256+
tracing::debug!(id = ?id, "new task details subscription");
257+
let task_id: span::Id = id.into();
258+
if let Some(stats) = self.stats.get(&task_id) {
259+
let (tx, rx) = mpsc::channel(buffer);
260+
let subscription = Watch(tx);
261+
let now = SystemTime::now();
262+
// Send back the stream receiver.
263+
// Then send the initial state --- if this fails, the subscription is already dead.
264+
if stream_sender.send(rx).is_ok()
265+
&& subscription.update(&proto::tasks::TaskDetails {
266+
task_id: Some(task_id.clone().into()),
267+
now: Some(now.into()),
268+
poll_times_histogram: serialize_histogram(&stats.poll_times_histogram).ok(),
269+
})
270+
{
271+
self.details_watchers
272+
.entry(task_id)
273+
.or_insert_with(Vec::new)
274+
.push(subscription);
275+
}
276+
}
277+
// If the task is not found, drop `stream_sender` which will result in a not found error
278+
}
279+
196280
/// Publish the current state to all active watchers.
197281
///
198282
/// This drops any watchers which have closed the RPC, or whose update
@@ -216,13 +300,33 @@ impl Aggregator {
216300
.since_last_update()
217301
.map(|(id, stats)| (id.into_u64(), stats.to_proto()))
218302
.collect();
303+
219304
let update = proto::tasks::TaskUpdate {
220305
new_metadata,
221306
new_tasks,
222307
stats_update,
223308
now: Some(now.into()),
224309
};
225-
self.watchers.retain(|watch: &Watch| watch.update(&update));
310+
self.watchers
311+
.retain(|watch: &Watch<proto::tasks::TaskUpdate>| watch.update(&update));
312+
313+
let stats = &self.stats;
314+
// Assuming there are much fewer task details subscribers than there are
315+
// stats updates, iterate over `details_watchers` and compact the map.
316+
self.details_watchers.retain(|id, watchers| {
317+
if let Some(task_stats) = stats.get(id) {
318+
let details = proto::tasks::TaskDetails {
319+
task_id: Some(id.clone().into()),
320+
now: Some(now.into()),
321+
poll_times_histogram: serialize_histogram(&task_stats.poll_times_histogram)
322+
.ok(),
323+
};
324+
watchers.retain(|watch| watch.update(&details));
325+
!watchers.is_empty()
326+
} else {
327+
false
328+
}
329+
});
226330
}
227331

228332
/// Update the current state with data from a single event.
@@ -273,7 +377,12 @@ impl Aggregator {
273377
stats.current_polls -= 1;
274378
if stats.current_polls == 0 {
275379
if let Some(last_poll) = stats.last_poll {
276-
stats.busy_time += at.duration_since(last_poll).unwrap();
380+
let elapsed = at.duration_since(last_poll).unwrap();
381+
stats.busy_time += elapsed;
382+
stats
383+
.poll_times_histogram
384+
.record(elapsed.as_nanos().try_into().unwrap_or(u64::MAX))
385+
.unwrap();
277386
}
278387
}
279388
}
@@ -427,6 +536,10 @@ impl<T> TaskData<T> {
427536
fn all(&self) -> impl Iterator<Item = (&span::Id, &T)> {
428537
self.data.iter().map(|(id, (data, _))| (id, data))
429538
}
539+
540+
fn get(&self, id: &span::Id) -> Option<&T> {
541+
self.data.get(id).map(|(data, _)| data)
542+
}
430543
}
431544

432545
struct Updating<'a, T>(&'a mut (T, bool));
@@ -450,8 +563,8 @@ impl<'a, T> Drop for Updating<'a, T> {
450563
}
451564
}
452565

453-
impl Watch {
454-
fn update(&self, update: &proto::tasks::TaskUpdate) -> bool {
566+
impl<T: Clone> Watch<T> {
567+
fn update(&self, update: &T) -> bool {
455568
if let Ok(reserve) = self.0.try_reserve() {
456569
reserve.send(Ok(update.clone()));
457570
true
@@ -497,3 +610,10 @@ impl Task {
497610
}
498611
}
499612
}
613+
614+
fn serialize_histogram(histogram: &Histogram<u64>) -> Result<Vec<u8>, V2SerializeError> {
615+
let mut serializer = V2Serializer::new();
616+
let mut buf = Vec::new();
617+
serializer.serialize(histogram, &mut buf)?;
618+
Ok(buf)
619+
}

0 commit comments

Comments
 (0)