Skip to content

split service&instance metrics, serve metrics for build-server & watcher #2038

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Jun 4, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 19 additions & 6 deletions src/bin/cratesfyi.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::env;
use std::fmt::Write;
use std::net::SocketAddr;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::Arc;
Expand All @@ -14,8 +15,8 @@ use docs_rs::utils::{
remove_crate_priority, set_crate_priority, ConfigName,
};
use docs_rs::{
start_web_server, BuildQueue, Config, Context, Index, Metrics, PackageKind, RustwideBuilder,
Storage,
start_background_metrics_webserver, start_web_server, BuildQueue, Config, Context, Index,
Metrics, PackageKind, RustwideBuilder, Storage,
};
use humantime::Duration;
use once_cell::sync::OnceCell;
Expand Down Expand Up @@ -107,10 +108,12 @@ enum CommandLine {
/// Starts web server
StartWebServer {
#[arg(name = "SOCKET_ADDR", default_value = "0.0.0.0:3000")]
socket_addr: String,
socket_addr: SocketAddr,
},

StartRegistryWatcher {
#[arg(name = "SOCKET_ADDR", default_value = "0.0.0.0:3000")]
metric_server_socket_addr: SocketAddr,
/// Enable or disable the repository stats updater
#[arg(
long = "repository-stats-updater",
Expand All @@ -122,7 +125,10 @@ enum CommandLine {
cdn_invalidator: Toggle,
},

StartBuildServer,
StartBuildServer {
#[arg(name = "SOCKET_ADDR", default_value = "0.0.0.0:3000")]
metric_server_socket_addr: SocketAddr,
},

/// Starts the daemon
Daemon {
Expand Down Expand Up @@ -154,6 +160,7 @@ impl CommandLine {
subcommand,
} => subcommand.handle_args(ctx, skip_if_exists)?,
Self::StartRegistryWatcher {
metric_server_socket_addr,
repository_stats_updater,
cdn_invalidator,
} => {
Expand All @@ -164,16 +171,22 @@ impl CommandLine {
docs_rs::utils::daemon::start_background_cdn_invalidator(&ctx)?;
}

start_background_metrics_webserver(Some(metric_server_socket_addr), &ctx)?;

docs_rs::utils::watch_registry(ctx.build_queue()?, ctx.config()?, ctx.index()?)?;
}
Self::StartBuildServer => {
Self::StartBuildServer {
metric_server_socket_addr,
} => {
start_background_metrics_webserver(Some(metric_server_socket_addr), &ctx)?;

let build_queue = ctx.build_queue()?;
let rustwide_builder = RustwideBuilder::init(&ctx)?;
queue_builder(rustwide_builder, build_queue)?;
}
Self::StartWebServer { socket_addr } => {
// Blocks indefinitely
start_web_server(Some(&socket_addr), &ctx)?;
start_web_server(Some(socket_addr), &ctx)?;
}
Self::Daemon { registry_watcher } => {
docs_rs::utils::start_daemon(ctx, registry_watcher == Toggle::Enabled)?;
Expand Down
10 changes: 5 additions & 5 deletions src/build_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ impl BuildQueue {
to_process.name, to_process.version
)
});
self.metrics.total_builds.inc();
self.metrics.instance.total_builds.inc();
if let Err(err) =
cdn::queue_crate_invalidation(&mut transaction, &self.config, &to_process.name)
{
Expand All @@ -225,7 +225,7 @@ impl BuildQueue {
.get(0);

if attempt >= self.max_attempts {
self.metrics.failed_builds.inc();
self.metrics.instance.failed_builds.inc();
}

report_error(&e);
Expand Down Expand Up @@ -320,7 +320,7 @@ impl BuildQueue {
"{}-{} added into build queue",
release.name, release.version
);
self.metrics.queued_builds.inc();
self.metrics.instance.queued_builds.inc();
crates_added += 1;
}
Err(err) => report_error(&err),
Expand Down Expand Up @@ -601,8 +601,8 @@ mod tests {

// Ensure metrics were recorded correctly
let metrics = env.metrics();
assert_eq!(metrics.total_builds.get(), 9);
assert_eq!(metrics.failed_builds.get(), 1);
assert_eq!(metrics.instance.total_builds.get(), 9);
assert_eq!(metrics.instance.failed_builds.get(), 1);

// no invalidations were run since we don't have a distribution id configured
assert!(cdn::queued_or_active_crate_invalidations(&mut *env.db().conn())?.is_empty());
Expand Down
2 changes: 2 additions & 0 deletions src/cdn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,7 @@ pub(crate) fn handle_queued_invalidation_requests(
if let Ok(duration) = (now - row.get::<_, DateTime<Utc>>(0)).to_std() {
// This can only fail when the duration is negative, which can't happen anyways
metrics
.instance
.cdn_invalidation_time
.with_label_values(&[distribution_id])
.observe(duration_to_seconds(duration));
Expand Down Expand Up @@ -400,6 +401,7 @@ pub(crate) fn handle_queued_invalidation_requests(
if let Ok(duration) = (now - row.get::<_, DateTime<Utc>>("queued")).to_std() {
// This can only fail when the duration is negative, which can't happen anyways
metrics
.instance
.cdn_queue_time
.with_label_values(&[distribution_id])
.observe(duration_to_seconds(duration));
Expand Down
2 changes: 1 addition & 1 deletion src/db/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ impl Pool {
match self.with_pool(|p| p.get()) {
Ok(conn) => Ok(conn),
Err(err) => {
self.metrics.failed_db_connections.inc();
self.metrics.instance.failed_db_connections.inc();
Err(PoolError::ClientError(err))
}
}
Expand Down
6 changes: 3 additions & 3 deletions src/docbuilder/rustwide_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -484,11 +484,11 @@ impl RustwideBuilder {

let has_examples = build.host_source_dir().join("examples").is_dir();
if res.result.successful {
self.metrics.successful_builds.inc();
self.metrics.instance.successful_builds.inc();
} else if res.cargo_metadata.root().is_library() {
self.metrics.failed_builds.inc();
self.metrics.instance.failed_builds.inc();
} else {
self.metrics.non_library_builds.inc();
self.metrics.instance.non_library_builds.inc();
}

let release_data = if !is_local {
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ pub use self::docbuilder::RustwideBuilder;
pub use self::index::Index;
pub use self::metrics::Metrics;
pub use self::storage::Storage;
pub use self::web::start_web_server;
pub use self::web::{start_background_metrics_webserver, start_web_server};

mod build_queue;
pub mod cdn;
Expand Down
2 changes: 0 additions & 2 deletions src/metrics/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ pub(super) trait MetricFromOpts: Sized {
fn from_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error>;
}

#[macro_export]
macro_rules! metrics {
(
$vis:vis struct $name:ident {
Expand Down Expand Up @@ -82,7 +81,6 @@ macro_rules! metrics {
};
}

#[macro_export]
macro_rules! load_metric_type {
($name:ident as single) => {
use prometheus::$name;
Expand Down
154 changes: 116 additions & 38 deletions src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,22 +35,23 @@ pub const CDN_INVALIDATION_HISTOGRAM_BUCKETS: &[f64; 11] = &[
24000.0, // 240
];

metrics! {
pub struct Metrics {
/// Number of crates in the build queue
queued_crates_count: IntGauge,
/// Number of crates in the build queue that have a positive priority
prioritized_crates_count: IntGauge,
/// Number of crates that failed to build
failed_crates_count: IntGauge,
/// Whether the build queue is locked
queue_is_locked: IntGauge,
/// queued crates by priority
queued_crates_count_by_priority: IntGaugeVec["priority"],

/// queued CDN invalidations
queued_cdn_invalidations_by_distribution: IntGaugeVec["distribution"],
#[derive(Debug)]
pub struct Metrics {
pub instance: InstanceMetrics,
pub service: ServiceMetrics,
}

impl Metrics {
pub fn new() -> Result<Self, prometheus::Error> {
Ok(Self {
instance: InstanceMetrics::new()?,
service: ServiceMetrics::new()?,
})
}
}

metrics! {
pub struct InstanceMetrics {
/// The number of idle database connections
idle_db_connections: IntGauge,
/// The number of used database connections
Expand Down Expand Up @@ -143,7 +144,7 @@ impl RecentlyAccessedReleases {
.insert((version, TargetAtom::from(target)), now);
}

pub(crate) fn gather(&self, metrics: &Metrics) {
pub(crate) fn gather(&self, metrics: &InstanceMetrics) {
fn inner<K: std::hash::Hash + Eq>(map: &DashMap<K, Instant>, metric: &IntGaugeVec) {
let mut hour_count = 0;
let mut half_hour_count = 0;
Expand Down Expand Up @@ -182,18 +183,112 @@ impl RecentlyAccessedReleases {
}
}

impl Metrics {
impl InstanceMetrics {
pub(crate) fn gather(&self, pool: &Pool) -> Result<Vec<MetricFamily>, Error> {
self.idle_db_connections.set(pool.idle_connections() as i64);
self.used_db_connections.set(pool.used_connections() as i64);
self.max_db_connections.set(pool.max_size() as i64);

self.recently_accessed_releases.gather(self);
self.gather_system_performance();
Ok(self.registry.gather())
}

#[cfg(not(target_os = "linux"))]
fn gather_system_performance(&self) {}

#[cfg(target_os = "linux")]
fn gather_system_performance(&self) {
use procfs::process::Process;

let process = Process::myself().unwrap();
self.open_file_descriptors
.set(process.fd_count().unwrap() as i64);
self.running_threads
.set(process.stat().unwrap().num_threads);
}
}

fn metric_from_opts<T: MetricFromOpts + Clone + prometheus::core::Collector + 'static>(
registry: &prometheus::Registry,
metric: &str,
help: &str,
variable_label: Option<&str>,
) -> Result<T, prometheus::Error> {
let mut opts = prometheus::Opts::new(metric, help).namespace("docsrs");

if let Some(label) = variable_label {
opts = opts.variable_label(label);
}

let metric = T::from_opts(opts)?;
registry.register(Box::new(metric.clone()))?;
Ok(metric)
}

#[derive(Debug)]
pub struct ServiceMetrics {
pub queued_crates_count: IntGauge,
pub prioritized_crates_count: IntGauge,
pub failed_crates_count: IntGauge,
pub queue_is_locked: IntGauge,
pub queued_crates_count_by_priority: IntGaugeVec,
pub queued_cdn_invalidations_by_distribution: IntGaugeVec,

registry: prometheus::Registry,
}

impl ServiceMetrics {
pub fn new() -> Result<Self, prometheus::Error> {
let registry = prometheus::Registry::new();
Ok(Self {
registry: registry.clone(),
queued_crates_count: metric_from_opts(
&registry,
"queued_crates_count",
"Number of crates in the build queue",
None,
)?,
prioritized_crates_count: metric_from_opts(
&registry,
"prioritized_crates_count",
"Number of crates in the build queue that have a positive priority",
None,
)?,
failed_crates_count: metric_from_opts(
&registry,
"failed_crates_count",
"Number of crates that failed to build",
None,
)?,
queue_is_locked: metric_from_opts(
&registry,
"queue_is_locked",
"Whether the build queue is locked",
None,
)?,
queued_crates_count_by_priority: metric_from_opts(
&registry,
"queued_crates_count_by_priority",
"queued crates by priority",
Some("priority"),
)?,
queued_cdn_invalidations_by_distribution: metric_from_opts(
&registry,
"queued_cdn_invalidations_by_distribution",
"queued CDN invalidations",
Some("distribution"),
)?,
})
}

pub(crate) fn gather(
&self,
pool: &Pool,
queue: &BuildQueue,
config: &Config,
) -> Result<Vec<MetricFamily>, Error> {
self.idle_db_connections.set(pool.idle_connections() as i64);
self.used_db_connections.set(pool.used_connections() as i64);
self.max_db_connections.set(pool.max_size() as i64);
self.queue_is_locked.set(queue.is_locked()? as i64);

self.queued_crates_count.set(queue.pending_count()? as i64);
self.prioritized_crates_count
.set(queue.prioritized_count()? as i64);
Expand All @@ -216,23 +311,6 @@ impl Metrics {
}

self.failed_crates_count.set(queue.failed_count()? as i64);

self.recently_accessed_releases.gather(self);
self.gather_system_performance();
Ok(self.registry.gather())
}

#[cfg(not(target_os = "linux"))]
fn gather_system_performance(&self) {}

#[cfg(target_os = "linux")]
fn gather_system_performance(&self) {
use procfs::process::Process;

let process = Process::myself().unwrap();
self.open_file_descriptors
.set(process.fd_count().unwrap() as i64);
self.running_threads
.set(process.stat().unwrap().num_threads);
}
}
2 changes: 1 addition & 1 deletion src/storage/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ impl<'a> StorageTransaction for DatabaseStorageTransaction<'a> {
SET mime = EXCLUDED.mime, content = EXCLUDED.content, compression = EXCLUDED.compression",
&[&blob.path, &blob.mime, &blob.content, &compression],
)?;
self.metrics.uploaded_files_total.inc();
self.metrics.instance.uploaded_files_total.inc();
}
Ok(())
}
Expand Down
Loading