Skip to content

Commit 1c40e67

Browse files
committed
storage,sql: fix naming nits in mz_source_statistics
To comport with the system catalog style guide: * Change `rehydration_latency_ms uint8` to `rehydration_latency interval`, since interval types are preferred to integers where the units are indicated in the column name's suffix. This rule was undocumented, so also add it to the style guide. * Rename `envelope_state_count` to `envelope_state_records`. This one is more subjective, but "records" aligns with names used in other relations, like `mz_dataflow_arrangement_sizes.records` and `mz_records_per_dataflow`.
1 parent 3f18e48 commit 1c40e67

File tree

13 files changed

+78
-73
lines changed

13 files changed

+78
-73
lines changed

doc/developer/style.md

+2-1
Original file line numberDiff line numberDiff line change
@@ -128,11 +128,12 @@ We adhere to standards for our system catalog relations (tables, views), which i
128128
Modeling standards:
129129
* Normalize the schema for tables. If you’re adding a table that adds detail to rows in an existing table, refer to those rows by ID, and don’t duplicate columns that already exist. E.g., the `mz_kafka_sources` table does not include the name of the source, since that information is available in the `mz_sources` table.
130130
* Remember, Materialize is good at joins! We can always add syntax sugar via a `SHOW` command or a view to spare users from typing out the joins for common queries.
131+
* Use the `interval` type to represent durations. Do not use integers where the unit is indicated as a suffix on the column name. E.g., use `startup_time interval`, not `startup_time_ms integer`. The only exception is durations with nanosecond precision. Since the `interval` type only has millisecond precision, it is acceptable to use `<$name>_ns integer` when necessary (e.g., `delay_ns`).
131132

132133
Naming standards:
133134
* Catalog relation names should be consistent with the user-facing naming and messaging in our docs. The names should not reference internal-only concepts when possible.
134135
* Avoid all but the most common abbreviations. Say `position` instead of `pos`. Say `return` instead of `ret`. Say `definition` instead of `def`.
135-
* We allow three abbreviations at present: `id`, `oid`, and `ip`.
136+
* We allow four abbreviations at present: `id`, `oid`, `ip`, and `url`.
136137
* Use `kebab-case` for enum values. E.g., the `type` of a Confluent Schema Registry connection is `confluent-schema-registry` and the `type` of a materialized view is `materialized-view`. Only use hyphens to separate multiple words. Don’t introduce hyphens for CamelCased proper nouns. For example, the “AWS PrivateLink” connection is represented as `aws-privatelink`.
137138
* Name timestamp fields with an `_at` suffix, e.g., `occurred_at`.
138139
* Do not name boolean fields with an `is_` prefix. E.g., say `indexed`, not `is_indexed`.

doc/user/content/sql/system-catalog/mz_internal.md

+10-10
Original file line numberDiff line numberDiff line change
@@ -688,16 +688,16 @@ the system are restarted.
688688
<!-- RELATION_SPEC mz_internal.mz_source_statistics -->
689689
| Field | Type | Meaning |
690690
| -------------------------|-------------| -------- |
691-
| `id` | [`text`] | The ID of the source. Corresponds to [`mz_catalog.mz_sources.id`](../mz_catalog#mz_sources). |
692-
| `worker_id` | [`uint8`] | The ID of the worker thread. |
693-
| `snapshot_committed` | [`boolean`] | Whether the worker has committed the initial snapshot for a source. |
694-
| `messages_received` | [`uint8`] | The number of messages the worker has received from the external system. Messages are counted in a source type-specific manner. Messages do not correspond directly to updates: some messages produce multiple updates, while other messages may be coalesced into a single update. |
695-
| `updates_staged` | [`uint8`] | The number of updates (insertions plus deletions) the worker has written but not yet committed to the storage layer. |
696-
| `updates_committed` | [`uint8`] | The number of updates (insertions plus deletions) the worker has committed to the storage layer. |
697-
| `bytes_received` | [`uint8`] | The number of bytes the worker has read from the external system. Bytes are counted in a source type-specific manner and may or may not include protocol overhead. |
698-
| `envelope_state_bytes` | [`uint8`] | The number of bytes stored in the source envelope state. |
699-
| `envelope_state_count` | [`uint8`] | The number of individual records stored in the source envelope state. |
700-
| `rehydration_latency_ms` | [`uint8`] | The amount of time in milliseconds it took for the worker to rehydrate the source envelope state. |
691+
| `id` | [`text`] | The ID of the source. Corresponds to [`mz_catalog.mz_sources.id`](../mz_catalog#mz_sources). |
692+
| `worker_id` | [`uint8`] | The ID of the worker thread. |
693+
| `snapshot_committed` | [`boolean`] | Whether the worker has committed the initial snapshot for a source. |
694+
| `messages_received` | [`uint8`] | The number of messages the worker has received from the external system. Messages are counted in a source type-specific manner. Messages do not correspond directly to updates: some messages produce multiple updates, while other messages may be coalesced into a single update. |
695+
| `updates_staged` | [`uint8`] | The number of updates (insertions plus deletions) the worker has written but not yet committed to the storage layer. |
696+
| `updates_committed` | [`uint8`] | The number of updates (insertions plus deletions) the worker has committed to the storage layer. |
697+
| `bytes_received` | [`uint8`] | The number of bytes the worker has read from the external system. Bytes are counted in a source type-specific manner and may or may not include protocol overhead. |
698+
| `envelope_state_bytes` | [`uint8`] | The number of bytes stored in the source envelope state. |
699+
| `envelope_state_records` | [`uint8`] | The number of individual records stored in the source envelope state. |
700+
| `rehydration_latency` | [`interval`] | The amount of time it took for the worker to rehydrate the source envelope state. |
701701

702702
### `mz_source_statuses`
703703

src/catalog/src/builtin.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -2684,8 +2684,8 @@ pub static MZ_SOURCE_STATISTICS: Lazy<BuiltinSource> = Lazy::new(|| BuiltinSourc
26842684
.with_column("updates_committed", ScalarType::UInt64.nullable(false))
26852685
.with_column("bytes_received", ScalarType::UInt64.nullable(false))
26862686
.with_column("envelope_state_bytes", ScalarType::UInt64.nullable(false))
2687-
.with_column("envelope_state_count", ScalarType::UInt64.nullable(false))
2688-
.with_column("rehydration_latency_ms", ScalarType::UInt64.nullable(true)),
2687+
.with_column("envelope_state_records", ScalarType::UInt64.nullable(false))
2688+
.with_column("rehydration_latency", ScalarType::Interval.nullable(true)),
26892689
is_retained_metrics_object: false,
26902690
sensitivity: DataSensitivity::Public,
26912691
});

src/storage-client/src/client.proto

+2-2
Original file line numberDiff line numberDiff line change
@@ -95,8 +95,8 @@ message ProtoStorageResponse {
9595
uint64 updates_committed = 6;
9696
uint64 bytes_received = 7;
9797
uint64 envelope_state_bytes = 8;
98-
uint64 envelope_state_count = 9;
99-
optional uint64 rehydration_latency_ms = 10;
98+
uint64 envelope_state_records = 9;
99+
optional int64 rehydration_latency_ms = 10;
100100
}
101101
message ProtoSinkStatisticsUpdate {
102102
mz_repr.global_id.ProtoGlobalId id = 1;

src/storage-client/src/client.rs

+9-6
Original file line numberDiff line numberDiff line change
@@ -304,8 +304,8 @@ pub struct SourceStatisticsUpdate {
304304
pub updates_committed: u64,
305305
pub bytes_received: u64,
306306
pub envelope_state_bytes: u64,
307-
pub envelope_state_count: u64,
308-
pub rehydration_latency_ms: Option<u64>,
307+
pub envelope_state_records: u64,
308+
pub rehydration_latency_ms: Option<i64>,
309309
}
310310

311311
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
@@ -335,8 +335,11 @@ impl PackableStats for SourceStatisticsUpdate {
335335
packer.push(Datum::from(self.updates_committed));
336336
packer.push(Datum::from(self.bytes_received));
337337
packer.push(Datum::from(self.envelope_state_bytes));
338-
packer.push(Datum::from(self.envelope_state_count));
339-
packer.push(Datum::from(self.rehydration_latency_ms));
338+
packer.push(Datum::from(self.envelope_state_records));
339+
packer.push(Datum::from(
340+
self.rehydration_latency_ms
341+
.map(chrono::Duration::milliseconds),
342+
));
340343
}
341344
}
342345
impl PackableStats for SinkStatisticsUpdate {
@@ -392,7 +395,7 @@ impl RustType<ProtoStorageResponse> for StorageResponse<mz_repr::Timestamp> {
392395
updates_committed: update.updates_committed,
393396
bytes_received: update.bytes_received,
394397
envelope_state_bytes: update.envelope_state_bytes,
395-
envelope_state_count: update.envelope_state_count,
398+
envelope_state_records: update.envelope_state_records,
396399
rehydration_latency_ms: update.rehydration_latency_ms,
397400
})
398401
.collect(),
@@ -439,7 +442,7 @@ impl RustType<ProtoStorageResponse> for StorageResponse<mz_repr::Timestamp> {
439442
updates_committed: update.updates_committed,
440443
bytes_received: update.bytes_received,
441444
envelope_state_bytes: update.envelope_state_bytes,
442-
envelope_state_count: update.envelope_state_count,
445+
envelope_state_records: update.envelope_state_records,
443446
rehydration_latency_ms: update.rehydration_latency_ms,
444447
})
445448
})

src/storage/src/render/upsert/types.rs

+3-2
Original file line numberDiff line numberDiff line change
@@ -859,7 +859,7 @@ where
859859
self.stats
860860
.set_envelope_state_bytes(self.snapshot_stats.size_diff);
861861
self.stats
862-
.set_envelope_state_count(self.snapshot_stats.values_diff);
862+
.set_envelope_state_records(self.snapshot_stats.values_diff);
863863

864864
if completed {
865865
if self.shrink_upsert_unused_buffers_by_ratio > 0 {
@@ -913,7 +913,8 @@ where
913913
self.worker_metrics.upsert_deletes.inc_by(stats.deletes);
914914

915915
self.stats.update_envelope_state_bytes_by(stats.size_diff);
916-
self.stats.update_envelope_state_count_by(stats.values_diff);
916+
self.stats
917+
.update_envelope_state_records_by(stats.values_diff);
917918

918919
Ok(())
919920
}

src/storage/src/statistics.rs

+27-27
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,12 @@ use std::rc::Rc;
1414

1515
use mz_ore::metric;
1616
use mz_ore::metrics::{
17-
CounterVecExt, DeleteOnDropCounter, DeleteOnDropGauge, GaugeVecExt, IntCounterVec,
17+
CounterVecExt, DeleteOnDropCounter, DeleteOnDropGauge, GaugeVecExt, IntCounterVec, IntGaugeVec,
1818
MetricsRegistry, UIntGaugeVec,
1919
};
2020
use mz_repr::GlobalId;
2121
use mz_storage_client::client::{SinkStatisticsUpdate, SourceStatisticsUpdate};
22-
use prometheus::core::AtomicU64;
22+
use prometheus::core::{AtomicI64, AtomicU64};
2323
use timely::progress::frontier::Antichain;
2424
use timely::progress::Timestamp;
2525

@@ -34,8 +34,8 @@ pub(crate) struct SourceStatisticsMetricsDefinitions {
3434
pub(crate) updates_committed: IntCounterVec,
3535
pub(crate) bytes_received: IntCounterVec,
3636
pub(crate) envelope_state_bytes: UIntGaugeVec,
37-
pub(crate) envelope_state_count: UIntGaugeVec,
38-
pub(crate) rehydration_latency_ms: UIntGaugeVec,
37+
pub(crate) envelope_state_records: UIntGaugeVec,
38+
pub(crate) rehydration_latency_ms: IntGaugeVec,
3939
}
4040

4141
impl SourceStatisticsMetricsDefinitions {
@@ -71,8 +71,8 @@ impl SourceStatisticsMetricsDefinitions {
7171
help: "The number of bytes of the source envelope state kept. This will be specific to the envelope in use.",
7272
var_labels: ["source_id", "worker_id", "parent_source_id", "shard_id"],
7373
)),
74-
envelope_state_count: registry.register(metric!(
75-
name: "mz_source_envelope_state_count",
74+
envelope_state_records: registry.register(metric!(
75+
name: "mz_source_envelope_state_records",
7676
help: "The number of records in the source envelope state. This will be specific to the envelope in use",
7777
var_labels: ["source_id", "worker_id", "parent_source_id", "shard_id"],
7878
)),
@@ -94,8 +94,8 @@ pub struct SourceStatisticsMetrics {
9494
pub(crate) updates_committed: DeleteOnDropCounter<'static, AtomicU64, Vec<String>>,
9595
pub(crate) bytes_received: DeleteOnDropCounter<'static, AtomicU64, Vec<String>>,
9696
pub(crate) envelope_state_bytes: DeleteOnDropGauge<'static, AtomicU64, Vec<String>>,
97-
pub(crate) envelope_state_count: DeleteOnDropGauge<'static, AtomicU64, Vec<String>>,
98-
pub(crate) rehydration_latency_ms: DeleteOnDropGauge<'static, AtomicU64, Vec<String>>,
97+
pub(crate) envelope_state_records: DeleteOnDropGauge<'static, AtomicU64, Vec<String>>,
98+
pub(crate) rehydration_latency_ms: DeleteOnDropGauge<'static, AtomicI64, Vec<String>>,
9999
}
100100

101101
impl SourceStatisticsMetrics {
@@ -161,9 +161,9 @@ impl SourceStatisticsMetrics {
161161
parent_source_id.to_string(),
162162
shard.clone(),
163163
]),
164-
envelope_state_count: metrics
164+
envelope_state_records: metrics
165165
.source_statistics
166-
.envelope_state_count
166+
.envelope_state_records
167167
.get_delete_on_drop_gauge(vec![
168168
id.to_string(),
169169
worker_id.to_string(),
@@ -317,7 +317,7 @@ impl StorageStatistics<SourceStatisticsUpdate, SourceStatisticsMetrics> {
317317
updates_committed: 0,
318318
bytes_received: 0,
319319
envelope_state_bytes: 0,
320-
envelope_state_count: 0,
320+
envelope_state_records: 0,
321321
rehydration_latency_ms: None,
322322
},
323323
SourceStatisticsMetrics::new(id, worker_id, metrics, parent_source_id, shard_id),
@@ -407,43 +407,43 @@ impl StorageStatistics<SourceStatisticsUpdate, SourceStatisticsMetrics> {
407407
cur.2.envelope_state_bytes.set(value);
408408
}
409409

410-
/// Update the `envelope_state_count` stat.
410+
/// Update the `envelope_state_records` stat.
411411
/// A positive value will add and a negative value will subtract.
412-
pub fn update_envelope_state_count_by(&self, value: i64) {
412+
pub fn update_envelope_state_records_by(&self, value: i64) {
413413
let mut cur = self.stats.borrow_mut();
414-
if let Some(updated) = cur.1.envelope_state_count.checked_add_signed(value) {
415-
cur.1.envelope_state_count = updated;
416-
cur.2.envelope_state_count.set(updated);
414+
if let Some(updated) = cur.1.envelope_state_records.checked_add_signed(value) {
415+
cur.1.envelope_state_records = updated;
416+
cur.2.envelope_state_records.set(updated);
417417
} else {
418-
let envelope_state_count = cur.1.envelope_state_count;
418+
let envelope_state_records = cur.1.envelope_state_records;
419419
tracing::warn!(
420-
"Unexpected u64 overflow while updating envelope_state_count value {} with {}",
421-
envelope_state_count,
420+
"Unexpected u64 overflow while updating envelope_state_records value {} with {}",
421+
envelope_state_records,
422422
value
423423
);
424-
cur.1.envelope_state_count = 0;
425-
cur.2.envelope_state_count.set(0);
424+
cur.1.envelope_state_records = 0;
425+
cur.2.envelope_state_records.set(0);
426426
}
427427
}
428428

429-
/// Set the `envelope_state_count` to the given value
430-
pub fn set_envelope_state_count(&self, value: i64) {
429+
/// Set the `envelope_state_records` to the given value
430+
pub fn set_envelope_state_records(&self, value: i64) {
431431
let mut cur = self.stats.borrow_mut();
432432
let value = if value < 0 {
433433
tracing::warn!(
434-
"Unexpected negative value for envelope_state_count {}",
434+
"Unexpected negative value for envelope_state_records {}",
435435
value
436436
);
437437
0
438438
} else {
439439
value.unsigned_abs()
440440
};
441-
cur.1.envelope_state_count = value;
442-
cur.2.envelope_state_count.set(value);
441+
cur.1.envelope_state_records = value;
442+
cur.2.envelope_state_records.set(value);
443443
}
444444

445445
/// Set the `rehydration_latency_ms` to the given value.
446-
pub fn set_rehydration_latency_ms(&self, value: u64) {
446+
pub fn set_rehydration_latency_ms(&self, value: i64) {
447447
let mut cur = self.stats.borrow_mut();
448448
cur.1.rehydration_latency_ms = Some(value);
449449
cur.2.rehydration_latency_ms.set(value);

test/sqllogictest/autogenerated/mz_internal.slt

+2-2
Original file line numberDiff line numberDiff line change
@@ -389,8 +389,8 @@ SELECT position, name, type FROM objects WHERE schema = 'mz_internal' AND object
389389
6 updates_committed uint8
390390
7 bytes_received uint8
391391
8 envelope_state_bytes uint8
392-
9 envelope_state_count uint8
393-
10 rehydration_latency_ms uint8
392+
9 envelope_state_records uint8
393+
10 rehydration_latency interval
394394

395395
query ITT
396396
SELECT position, name, type FROM objects WHERE schema = 'mz_internal' AND object = 'mz_source_statuses' ORDER BY position

0 commit comments

Comments
 (0)