Skip to content

Commit aa09600

Browse files
authored
feat(subscriber): count dropped events due to buffer cap (#211)
See also #209 Currently, we don't really have any way of surfacing potential data loss due to the event buffer capacity limit --- so, if events are dropped, the user may not be *aware* that they're seeing an incomplete picture of their application. It would be better if we had a way to surface this in the UI. This branch adds support for counting the number of dropped events in the `ConsoleLayer`. This data is now included in the `ResourceUpdate`, `TaskUpdate`, and `AsyncOpUpdate` messages, respectively. We track a separate counter for dropped events of each type, to make it easier to determine what data may be missing. The console UI doesn't currently *display* these counts; that can be added in a separate PR. We may want to use the warnings interface for displaying this information?
1 parent 6d64810 commit aa09600

File tree

5 files changed

+205
-96
lines changed

5 files changed

+205
-96
lines changed

Diff for: console-api/proto/async_ops.proto

+11
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,17 @@ message AsyncOpUpdate {
1919
repeated AsyncOp new_async_ops = 1;
2020
// Any async op stats that have changed since the last update.
2121
map<uint64, Stats> stats_update = 2;
22+
// A count of how many async op events (e.g. polls, creation, etc) were not
23+
// recorded because the application's event buffer was at capacity.
24+
//
25+
// If everything is working normally, this should be 0. If it is greater
26+
// than 0, that may indicate that some data is missing from this update, and
27+
// it may be necessary to increase the number of events buffered by the
28+
// application to ensure that data loss is avoided.
29+
//
30+
// If the application's instrumentation ensures reliable delivery of events,
31+
// this will always be 0.
32+
uint64 dropped_events = 3;
2233
}
2334

2435
// An async operation.

Diff for: console-api/proto/resources.proto

+12
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,18 @@ message ResourceUpdate {
2323

2424
// A list of all new poll ops that have been invoked on resources since the last update.
2525
repeated PollOp new_poll_ops = 3;
26+
27+
// A count of how many resource events (e.g. polls, creation, etc) were not
28+
// recorded because the application's event buffer was at capacity.
29+
//
30+
// If everything is working normally, this should be 0. If it is greater
31+
// than 0, that may indicate that some data is missing from this update, and
32+
// it may be necessary to increase the number of events buffered by the
33+
// application to ensure that data loss is avoided.
34+
//
35+
// If the application's instrumentation ensures reliable delivery of events,
36+
// this will always be 0.
37+
uint64 dropped_events = 4;
2638
}
2739

2840
// Static data recorded when a new resource is created.

Diff for: console-api/proto/tasks.proto

+11
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,17 @@ message TaskUpdate {
2626
// *is* included in this map, the corresponding value represents a complete
2727
// snapshot of that task's stats at in the current time window.
2828
map<uint64, Stats> stats_update = 3;
29+
// A count of how many task events (e.g. polls, spawns, etc) were not
30+
// recorded because the application's event buffer was at capacity.
31+
//
32+
// If everything is working normally, this should be 0. If it is greater
33+
// than 0, that may indicate that some data is missing from this update, and
34+
// it may be necessary to increase the number of events buffered by the
35+
// application to ensure that data loss is avoided.
36+
//
37+
// If the application's instrumentation ensures reliable delivery of events,
38+
// this will always be 0.
39+
uint64 dropped_events = 4;
2940
}
3041

3142
// A task details update

Diff for: console-subscriber/src/aggregator/mod.rs

+20-14
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
use super::{AttributeUpdate, AttributeUpdateOp, Command, Event, UpdateType, WakeOp, Watch};
1+
use super::{
2+
AttributeUpdate, AttributeUpdateOp, Command, Event, Shared, UpdateType, WakeOp, Watch,
3+
};
24
use crate::{record::Recorder, WatchRequest};
35
use console_api as proto;
46
use proto::resources::resource;
@@ -42,8 +44,9 @@ pub(crate) struct Aggregator {
4244
/// How long to keep task data after a task has completed.
4345
retention: Duration,
4446

45-
/// Triggers a flush when the event buffer is approaching capacity.
46-
flush_capacity: Arc<Flush>,
47+
/// Shared state, including a `Notify` that triggers a flush when the event
48+
/// buffer is approaching capacity.
49+
shared: Arc<Shared>,
4750

4851
/// Currently active RPCs streaming task events.
4952
watchers: ShrinkVec<Watch<proto::instrument::Update>>,
@@ -99,7 +102,7 @@ pub(crate) struct Aggregator {
99102
temporality: Temporality,
100103
}
101104

102-
#[derive(Debug)]
105+
#[derive(Debug, Default)]
103106
pub(crate) struct Flush {
104107
pub(crate) should_flush: Notify,
105108
triggered: AtomicBool,
@@ -285,12 +288,10 @@ impl Aggregator {
285288
events: mpsc::Receiver<Event>,
286289
rpcs: mpsc::Receiver<Command>,
287290
builder: &crate::Builder,
291+
shared: Arc<crate::Shared>,
288292
) -> Self {
289293
Self {
290-
flush_capacity: Arc::new(Flush {
291-
should_flush: Notify::new(),
292-
triggered: AtomicBool::new(false),
293-
}),
294+
shared,
294295
rpcs,
295296
publish_interval: builder.publish_interval,
296297
retention: builder.retention,
@@ -316,10 +317,6 @@ impl Aggregator {
316317
}
317318
}
318319

319-
pub(crate) fn flush(&self) -> &Arc<Flush> {
320-
&self.flush_capacity
321-
}
322-
323320
pub(crate) async fn run(mut self) {
324321
let mut publish = tokio::time::interval(self.publish_interval);
325322
loop {
@@ -333,7 +330,7 @@ impl Aggregator {
333330
}
334331

335332
// triggered when the event buffer is approaching capacity
336-
_ = self.flush_capacity.should_flush.notified() => {
333+
_ = self.shared.flush.should_flush.notified() => {
337334
tracing::debug!("approaching capacity; draining buffer");
338335
false
339336
}
@@ -399,7 +396,7 @@ impl Aggregator {
399396
}
400397
self.cleanup_closed();
401398
if drained {
402-
self.flush_capacity.has_flushed();
399+
self.shared.flush.has_flushed();
403400
}
404401
}
405402
}
@@ -445,6 +442,7 @@ impl Aggregator {
445442
.map(|(_, value)| value.to_proto())
446443
.collect(),
447444
stats_update: self.task_stats.as_proto(Include::All),
445+
dropped_events: self.shared.dropped_tasks.swap(0, AcqRel) as u64,
448446
}),
449447
resource_update: Some(proto::resources::ResourceUpdate {
450448
new_resources: self
@@ -454,6 +452,7 @@ impl Aggregator {
454452
.collect(),
455453
stats_update: self.resource_stats.as_proto(Include::All),
456454
new_poll_ops: (*self.all_poll_ops).clone(),
455+
dropped_events: self.shared.dropped_resources.swap(0, AcqRel) as u64,
457456
}),
458457
async_op_update: Some(proto::async_ops::AsyncOpUpdate {
459458
new_async_ops: self
@@ -462,6 +461,7 @@ impl Aggregator {
462461
.map(|(_, value)| value.to_proto())
463462
.collect(),
464463
stats_update: self.async_op_stats.as_proto(Include::All),
464+
dropped_events: self.shared.dropped_async_ops.swap(0, AcqRel) as u64,
465465
}),
466466
now: Some(now.into()),
467467
new_metadata: Some(proto::RegisterMetadata {
@@ -534,6 +534,8 @@ impl Aggregator {
534534
.map(|(_, value)| value.to_proto())
535535
.collect(),
536536
stats_update: self.task_stats.as_proto(Include::UpdatedOnly),
537+
538+
dropped_events: self.shared.dropped_tasks.swap(0, AcqRel) as u64,
537539
}),
538540
resource_update: Some(proto::resources::ResourceUpdate {
539541
new_resources: self
@@ -543,6 +545,8 @@ impl Aggregator {
543545
.collect(),
544546
stats_update: self.resource_stats.as_proto(Include::UpdatedOnly),
545547
new_poll_ops,
548+
549+
dropped_events: self.shared.dropped_resources.swap(0, AcqRel) as u64,
546550
}),
547551
async_op_update: Some(proto::async_ops::AsyncOpUpdate {
548552
new_async_ops: self
@@ -551,6 +555,8 @@ impl Aggregator {
551555
.map(|(_, value)| value.to_proto())
552556
.collect(),
553557
stats_update: self.async_op_stats.as_proto(Include::UpdatedOnly),
558+
559+
dropped_events: self.shared.dropped_async_ops.swap(0, AcqRel) as u64,
554560
}),
555561
};
556562

0 commit comments

Comments
 (0)