Skip to content

Commit f4a21ac

Browse files
zaharidichevhawkw
andauthored
feat(subscriber): resource instrumentation (#77)
This PR adds the pieces needed to get resource instrumentation data into the console. This includes: - changes to the proto definitions - changes to the subscriber The UI part will come as a follow up PR. This branch uses a patched `tokio` that emits these tracing spans and events for the `Sleep` resource. You can look at the raw data by running: ``` cargo run --example app cargo run --example dump ``` The information piped through includes: - data describing the resource lifecycle, namely when resources are created and dropped - data describing the async operations that take place on these events and their associationg with tasks - data reflecting the state updates that take place on resources (e.g. resetting timer's duration, adding permits to a semaphore, etc) Signed-off-by: Zahari Dichev <[email protected]> Co-authored-by: Eliza Weisman <[email protected]>
1 parent 5fe4437 commit f4a21ac

25 files changed

+1807
-508
lines changed

Cargo.toml

+4-1
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,7 @@ members = [
44
"console-subscriber",
55
"console-api"
66
]
7-
resolver = "2"
7+
resolver = "2"
8+
9+
[patch.crates-io]
10+
tokio = { git = 'https://github.com/zaharidichev/tokio', branch = 'zd/instrument-sleep' }

console-api/build.rs

+3
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@ fn main() -> Result<(), Box<dyn Error>> {
55
"../proto/trace.proto",
66
"../proto/common.proto",
77
"../proto/tasks.proto",
8+
"../proto/instrument.proto",
9+
"../proto/resources.proto",
10+
"../proto/async_ops.proto",
811
];
912
let dirs = &["../proto"];
1013

console-api/src/async_ops.rs

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
tonic::include_proto!("rs.tokio.console.async_ops");

console-api/src/common.rs

+45-9
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use std::fmt;
2+
use std::hash::{Hash, Hasher};
23

34
tonic::include_proto!("rs.tokio.console.common");
45

@@ -32,19 +33,11 @@ impl<'a> From<&'a tracing_core::Metadata<'a>> for Metadata {
3233
metadata::Kind::Event
3334
};
3435

35-
let location = Location {
36-
file: meta.file().map(String::from),
37-
module_path: meta.module_path().map(String::from),
38-
line: meta.line(),
39-
column: None, // tracing doesn't support columns yet
40-
};
41-
4236
let field_names = meta.fields().iter().map(|f| f.name().to_string()).collect();
43-
4437
Metadata {
4538
name: meta.name().to_string(),
4639
target: meta.target().to_string(),
47-
location: Some(location),
40+
location: Some(meta.into()),
4841
kind: kind as i32,
4942
level: metadata::Level::from(*meta.level()) as i32,
5043
field_names,
@@ -53,6 +46,17 @@ impl<'a> From<&'a tracing_core::Metadata<'a>> for Metadata {
5346
}
5447
}
5548

49+
impl<'a> From<&'a tracing_core::Metadata<'a>> for Location {
50+
fn from(meta: &'a tracing_core::Metadata<'a>) -> Self {
51+
Location {
52+
file: meta.file().map(String::from),
53+
module_path: meta.module_path().map(String::from),
54+
line: meta.line(),
55+
column: None, // tracing doesn't support columns yet
56+
}
57+
}
58+
}
59+
5660
impl<'a> From<&'a std::panic::Location<'a>> for Location {
5761
fn from(loc: &'a std::panic::Location<'a>) -> Self {
5862
Location {
@@ -185,3 +189,35 @@ impl From<&dyn std::fmt::Debug> for field::Value {
185189
field::Value::DebugVal(format!("{:?}", val))
186190
}
187191
}
192+
193+
// Clippy warns when a type derives `PartialEq` but has a manual `Hash` impl,
194+
// or vice versa. However, this is unavoidable here, because `prost` generates
195+
// a struct with `#[derive(PartialEq)]`, but we cannot add`#[derive(Hash)]` to the
196+
// generated code.
197+
#[allow(clippy::derive_hash_xor_eq)]
198+
impl Hash for field::Name {
199+
fn hash<H: Hasher>(&self, state: &mut H) {
200+
match self {
201+
field::Name::NameIdx(idx) => idx.hash(state),
202+
field::Name::StrName(s) => s.hash(state),
203+
}
204+
}
205+
}
206+
207+
impl Eq for field::Name {}
208+
209+
// === IDs ===
210+
211+
impl From<u64> for Id {
212+
fn from(id: u64) -> Self {
213+
Id { id }
214+
}
215+
}
216+
217+
impl From<Id> for u64 {
218+
fn from(id: Id) -> Self {
219+
id.id
220+
}
221+
}
222+
223+
impl Copy for Id {}

console-api/src/instrument.rs

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
tonic::include_proto!("rs.tokio.console.instrument");

console-api/src/lib.rs

+3
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
1+
pub mod async_ops;
12
mod common;
3+
pub mod instrument;
4+
pub mod resources;
25
pub mod tasks;
36
pub mod trace;
47
pub use common::*;

console-api/src/resources.rs

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
tonic::include_proto!("rs.tokio.console.resources");

console-api/src/tasks.rs

-16
Original file line numberDiff line numberDiff line change
@@ -1,17 +1 @@
11
tonic::include_proto!("rs.tokio.console.tasks");
2-
3-
// === IDs ===
4-
5-
impl From<u64> for TaskId {
6-
fn from(id: u64) -> Self {
7-
TaskId { id }
8-
}
9-
}
10-
11-
impl From<TaskId> for u64 {
12-
fn from(id: TaskId) -> Self {
13-
id.id
14-
}
15-
}
16-
17-
impl Copy for TaskId {}

console-subscriber/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ parking_lot = ["parking_lot_crate", "tracing-subscriber/parking_lot"]
1313

1414
tokio = { version = "^1.10", features = ["sync", "time", "macros", "tracing"]}
1515
tokio-stream = "0.1"
16+
thread_local = "1.1.3"
1617
console-api = { path = "../console-api", features = ["transport"]}
1718
tonic = { version = "0.5", features = ["transport"] }
1819
tracing-core = "0.1.18"

console-subscriber/examples/dump.rs

+5-5
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use console_api::tasks::{tasks_client::TasksClient, TasksRequest};
1+
use console_api::instrument::{instrument_client::InstrumentClient, InstrumentRequest};
22
use futures::stream::StreamExt;
33

44
#[tokio::main]
@@ -11,16 +11,16 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
1111
});
1212

1313
eprintln!("CONNECTING: {}", target);
14-
let mut client = TasksClient::connect(target).await?;
14+
let mut client = InstrumentClient::connect(target).await?;
1515

16-
let request = tonic::Request::new(TasksRequest {});
17-
let mut stream = client.watch_tasks(request).await?.into_inner();
16+
let request = tonic::Request::new(InstrumentRequest {});
17+
let mut stream = client.watch_updates(request).await?.into_inner();
1818

1919
let mut i: usize = 0;
2020
while let Some(update) = stream.next().await {
2121
match update {
2222
Ok(update) => {
23-
eprintln!("UPDATE {}: {:#?}\n", i, update);
23+
println!("UPDATE {}: {:#?}\n", i, update);
2424
i += 1;
2525
}
2626
Err(e) => {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
use super::{shrink::ShrinkMap, Closable, Id, Ids, ToProto};
2+
use std::collections::{HashMap, HashSet};
3+
use std::ops::{Deref, DerefMut};
4+
use std::time::{Duration, SystemTime};
5+
6+
pub(crate) struct IdData<T> {
7+
data: ShrinkMap<Id, (T, bool)>,
8+
}
9+
10+
pub(crate) struct Updating<'a, T>(&'a mut (T, bool));
11+
12+
pub(crate) enum Include {
13+
All,
14+
UpdatedOnly,
15+
}
16+
17+
// === impl IdData ===
18+
19+
impl<T> Default for IdData<T> {
20+
fn default() -> Self {
21+
IdData {
22+
data: ShrinkMap::<Id, (T, bool)>::new(),
23+
}
24+
}
25+
}
26+
27+
impl<T> IdData<T> {
28+
pub(crate) fn update_or_default(&mut self, id: Id) -> Updating<'_, T>
29+
where
30+
T: Default,
31+
{
32+
Updating(self.data.entry(id).or_default())
33+
}
34+
35+
pub(crate) fn update(&mut self, id: &Id) -> Option<Updating<'_, T>> {
36+
self.data.get_mut(id).map(Updating)
37+
}
38+
39+
pub(crate) fn insert(&mut self, id: Id, data: T) {
40+
self.data.insert(id, (data, true));
41+
}
42+
43+
pub(crate) fn since_last_update(&mut self) -> impl Iterator<Item = (&Id, &mut T)> {
44+
self.data.iter_mut().filter_map(|(id, (data, dirty))| {
45+
if *dirty {
46+
*dirty = false;
47+
Some((id, data))
48+
} else {
49+
None
50+
}
51+
})
52+
}
53+
54+
pub(crate) fn all(&self) -> impl Iterator<Item = (&Id, &T)> {
55+
self.data.iter().map(|(id, (data, _))| (id, data))
56+
}
57+
58+
pub(crate) fn get(&self, id: &Id) -> Option<&T> {
59+
self.data.get(id).map(|(data, _)| data)
60+
}
61+
62+
pub(crate) fn as_proto(&mut self, include: Include) -> HashMap<u64, T::Output>
63+
where
64+
T: ToProto,
65+
{
66+
match include {
67+
Include::UpdatedOnly => self
68+
.since_last_update()
69+
.map(|(id, d)| (*id, d.to_proto()))
70+
.collect(),
71+
Include::All => self.all().map(|(id, d)| (*id, d.to_proto())).collect(),
72+
}
73+
}
74+
75+
pub(crate) fn drop_closed<R: Closable>(
76+
&mut self,
77+
stats: &mut IdData<R>,
78+
now: SystemTime,
79+
retention: Duration,
80+
has_watchers: bool,
81+
ids: &mut Ids,
82+
) {
83+
let _span = tracing::debug_span!(
84+
"drop_closed",
85+
entity = %std::any::type_name::<T>(),
86+
stats = %std::any::type_name::<R>(),
87+
)
88+
.entered();
89+
90+
// drop closed entities
91+
tracing::trace!(?retention, has_watchers, "dropping closed");
92+
93+
let mut dropped_ids = HashSet::new();
94+
stats.data.retain_and_shrink(|id, (stats, dirty)| {
95+
if let Some(closed) = stats.closed_at() {
96+
let closed_for = now.duration_since(closed).unwrap_or_default();
97+
let should_drop =
98+
// if there are any clients watching, retain all dirty tasks regardless of age
99+
(*dirty && has_watchers)
100+
|| closed_for > retention;
101+
tracing::trace!(
102+
stats.id = ?id,
103+
stats.closed_at = ?closed,
104+
stats.closed_for = ?closed_for,
105+
stats.dirty = *dirty,
106+
should_drop,
107+
);
108+
109+
if should_drop {
110+
dropped_ids.insert(*id);
111+
}
112+
return !should_drop;
113+
}
114+
115+
true
116+
});
117+
118+
// drop closed entities which no longer have stats.
119+
self.data
120+
.retain_and_shrink(|id, (_, _)| stats.data.contains_key(id));
121+
122+
if !dropped_ids.is_empty() {
123+
// drop closed entities which no longer have stats.
124+
self.data
125+
.retain_and_shrink(|id, (_, _)| stats.data.contains_key(id));
126+
ids.remove_all(&dropped_ids);
127+
}
128+
}
129+
}
130+
131+
// === impl Updating ===
132+
133+
impl<'a, T> Deref for Updating<'a, T> {
134+
type Target = T;
135+
fn deref(&self) -> &Self::Target {
136+
&self.0 .0
137+
}
138+
}
139+
140+
impl<'a, T> DerefMut for Updating<'a, T> {
141+
fn deref_mut(&mut self) -> &mut Self::Target {
142+
&mut self.0 .0
143+
}
144+
}
145+
146+
impl<'a, T> Drop for Updating<'a, T> {
147+
fn drop(&mut self) {
148+
self.0 .1 = true;
149+
}
150+
}

0 commit comments

Comments
 (0)